In [None]:
sc

In [None]:
RF_NUM_TREES=300
RF_MAX_DEPTH=15

In [None]:
import json
import os
import findspark
import numpy as np
from argparse import ArgumentParser
from string import punctuation

In [None]:
# Finds and adds spark to python path
# Convenient for env managers like conda

#findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import IntegerType, StringType
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.mllib.linalg import Vectors
from pyspark.ml.classification import RandomForestClassifier, GBTClassifier, MultilayerPerceptronClassifier, LinearSVC, LogisticRegression
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType, ArrayType, StringType, DoubleType
from pyspark.ml import Pipeline, PipelineModel

In [None]:
# Creates an initial spark configuration utilizing all local cores
conf = SparkConf().setMaster("local[*]")

In [None]:
# Creates spark context through which to process RDD ops
sc = SparkContext(conf = conf)

In [None]:
spark = SparkSession.builder\
                    .master("local")\
                    .appName("Word Count")\
                    .config("spark.driver.memory", "6g") \
                    .getOrCreate()

In [None]:
# Data pipeline for csv-->formatted dataframe
def mold(df, labeled=True,numPartitions=48):
    # Select relevent id+features
    df = df.select([df.columns[1]]+df.columns[145:])

    # Rename and recast id
    df = df.withColumn(df.schema.names[0],col(df.schema.names[0]).cast("Long")).withColumnRenamed("Face ID", "face_id")

    offset= 2 if labeled else 1

    # Rename and recast features
    for i in range(len(df.schema.names)-offset):
        df = df.withColumn(df.schema.names[1+i],col(df.schema.names[1+i]).cast("Float"))
  
    # Rename and recast labels (if appropriate)
    if labeled:
        df = df.withColumn('Y',col(df.schema.names[-1]).cast("Integer")).drop('Sex (subj)')
    
    return spark.createDataFrame(df.rdd.repartition(numPartitions))

In [None]:
# Creates handlers for spark-loaded files. Since spark uses lazy execution,
# this process occurs at no true cost and will only read/load memory when
# a downstream task requires it
bucket='gs://uga-dsp/project2/files/'
#_train=spark.read.load(f"{bucket}X_small_train.csv", format="csv", header=True)
_test=spark.read.load(f"{bucket}X_small_test.csv", format="csv", header=True)
big_train=spark.read.load(f"{bucket}X_train.csv", format="csv", header=True)
_testA=spark.read.load(f"{bucket}Xa_test.csv", format="csv", header=True)
_testB=spark.read.load(f"{bucket}Xb_test.csv", format="csv", header=True)
_testC=spark.read.load(f"{bucket}Xc_test.csv", format="csv", header=True)

In [None]:
# Loads either training set (or both). Uncomment as needed

#trainingData=mold(_train)
#del trainingData
#trainingDataBig=mold(big_train)

In [None]:
# Loads test sets. Uncomment as needed
testingData=mold(_test)
testingDataFinal={'a':mold(_testA,False),
                  'b':mold(_testB,False),
                  'c':mold(_testC,False)}

In [None]:
def buildModel(train,model_type='rf'):
    
    # Include a bias column for logistic regression
    if model_type=='lr':
        train=train.withColumn("bias", lit(1)).select([train.schema.names[0],'bias']+train.schema.names[1:])


    # Assembler for compiling features into a singular dense vector    
    train_assembler = VectorAssembler().setInputCols(train.schema.names[1:-1]).setOutputCol('features')

    # Assembled training data    
    trainData=train_assembler.transform(train).selectExpr('face_id','features',"Y")

    # Index labels, adding metadata to the label column.
    # Fit on whole dataset to include all labels in index.
    labelIndexer = StringIndexer(inputCol="Y", outputCol="indexedLabel").fit(trainData)

    # Create model templates.
    rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features", numTrees=RF_NUM_TREES,maxDepth=RF_MAX_DEPTH)
    
    gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="features",maxDepth=RF_MAX_DEPTH, maxIter=100)

    layers = [len(train.schema.names[1:-1]), 256, 256, 2]
    perceptron = MultilayerPerceptronClassifier(labelCol="indexedLabel", featuresCol="features",maxIter=400, layers=layers, blockSize=128)

    lsvc = LinearSVC(labelCol="indexedLabel", featuresCol="features",maxIter=40, regParam=0.1)

    lr = LogisticRegression(labelCol="indexedLabel", featuresCol="features",maxIter=400, regParam=0.0, elasticNetParam=0)

    # Convert indexed labels back to original labels.
    labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",
                                   labels=labelIndexer.labels)

    # Dictionary of model types for easy selection and extension
    _model={'rf':rf,'gbt':gbt,'per':perceptron,'svm':lsvc,'lr':lr}[model_type]

    # Chain indexers and chosen model in a Pipeline
    pipeline = Pipeline(stages=[labelIndexer, _model, labelConverter])

    # Train model.  This also runs the indexers.
    model = pipeline.fit(trainData)

    return model

In [None]:
def testModel(test,model,model_type='rf',labeled=True):

    # Include a bias column for logistic regression
    if model_type=='lr':
        test=test.withColumn("bias", lit(1)).select([test.schema.names[0],'bias']+test.schema.names[1:])

    # List of features to compile
    _names=test.schema.names[1:-1] if labeled else test.schema.names[1:]
    test_assembler = VectorAssembler().setInputCols(_names).setOutputCol('features')

    # Format of test data, depending on whether it is labeled
    cols=['face_id','features']
    if labeled:
        cols+=['Y']
    testData=test_assembler.transform(test).select(*cols)

    # Apply model to data to form prediction
    predictions = model.transform(testData)

    out_cols=['face_id','predictedLabel']
    if labeled:
        out_cols+=['Y']
    return predictions.select(*out_cols)
    

In [None]:
%time model=PipelineModel.load('gs://micky-practicum/rf_model')

In [None]:
#%time model=buildModel(trainingDataBig,model_type='rf')

In [None]:
output=testModel(testingDataFinal['a'],model,model_type='rf',labeled=False)

In [None]:
df = output.withColumn("predictedLabel", output["predictedLabel"].cast(IntegerType())).withColumn("face_id", output["face_id"].cast(StringType()))

In [None]:
df.sample(False,0.05,seed=0).count()

In [None]:
df.rdd.count()

In [None]:
df.rdd.coalesce(1).saveAsTextFile("gs://micky-practicum/ya.txt")

In [None]:
dir(df)

In [None]:
output.map(lambda x:int(x[1])).take(1)

In [None]:
output_count=output.count()

In [None]:
local_output=output.map(lambda x:int(x[1])).collect()

In [None]:
def saveResults(model):
    for s in ['a','b','c']:
        output=testModel(testingDataFinal[s],model,model_type='rf',labeled=False)
        dest=f'y{s}.txt'
        with open(dest, 'a') as the_file:
            for row in output.collect():
                the_file.write(f'{row[1]}\n')

In [None]:
def eval(out):
    count=0
    for o in out:
        if int(o[1])==o[2]:
            count+=1
    return count/len(out)

In [None]:
df=spark.createDataFrame(output.filter(lambda x:int(x[1])==x[2]))

In [None]:
df.write.csv('gs://micky-practicum/rf_output_test.csv')

In [None]:
df.count()/output.count()

In [None]:
# Save output. Make sure to name appropriately

dest='yc.txt'
with open(dest, 'a') as the_file:
    for row in output.collect():
        the_file.write(f'{row[1]}\n')