In [1]:
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext


sc = SparkContext('local','Neural Nets on Spark')  # if using locally
sql_sc = SQLContext(sc)

In [2]:
pandas_df = pd.read_csv('data_engineered_features500.csv') 
#s_df = sql_sc.createDataFrame(pandas_df)

In [3]:
pandas_df["gender_int"] = 0
pandas_df.loc[pandas_df["gender"] == "m", "gender_int"] = 1
pandas_df.loc[pandas_df["gender"] == "f", "gender_int"] = 2

In [4]:
pandas_df.drop(["userid", "track-name","artist-name","timestamp", "songlength", "gender"], axis=1, inplace=True)

In [5]:
pandas_df.head(5)

Unnamed: 0,weekday,hour,weekend,daytime,track-total-count,track-weekday-daytime-count,last-seen-song,month,quarter,skipped,artist-total-count,artist-weekday-daytime-count,last-seen-artist,age,gender_int
0,6,13,1,3,1,1,,8,3,0,1,1,,,1
1,6,14,1,3,1,1,,8,3,0,2,2,0.002882,,1
2,6,14,1,3,1,1,,8,3,0,3,3,0.005023,,1
3,6,15,1,3,1,1,,8,3,0,1,1,,,1
4,6,16,1,3,1,1,,8,3,1,3,3,0.000463,,1


In [6]:
list(pandas_df)

['weekday',
 'hour',
 'weekend',
 'daytime',
 'track-total-count',
 'track-weekday-daytime-count',
 'last-seen-song',
 'month',
 'quarter',
 'skipped',
 'artist-total-count',
 'artist-weekday-daytime-count',
 'last-seen-artist',
 'age',
 'gender_int']

In [7]:
pandas_df = pandas_df.fillna(0)
pandas_df = pandas_df.astype('float')
for c in pandas_df.columns:
    if (c!= "last-seen-song") or (c!= "last-seen-artist"):
        pandas_df[c] = pandas_df[c].astype('int')
    if (c== "last-seen-song") or (c== "last-seen-artist"):
        pandas_df[c] = pandas_df[c].astype('float')

In [8]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType, FloatType
from pyspark.ml.feature import VectorAssembler

In [9]:
schema = StructType([
    StructField("weekday", IntegerType()),
    StructField("hour", IntegerType()),
    StructField("weekend", IntegerType()),
    StructField("daytime", IntegerType()),
    StructField("track-total-count", IntegerType()),
    StructField("track-weekday-daytime-count", IntegerType()),
    StructField("last-seen-song", FloatType()),
    StructField("month", IntegerType()),
    StructField("quarter", IntegerType()),
    StructField("label", IntegerType()),
    StructField("artist-total-count", IntegerType()),
    StructField("artist-weekday-daytime-count", IntegerType()),
    StructField("last-seen-artist", FloatType()),
    StructField("age", IntegerType()),
    StructField("gender_int", IntegerType()),
])

In [None]:
schema_lol = StructType([
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", FloatType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("label", DoubleType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
    StructField("features", FloatType()),
    StructField("features", IntegerType()),
    StructField("features", IntegerType()),
])

In [None]:
spark_df = sql_sc.createDataFrame(pandas_df,  schema = schema)
vecAssembler = VectorAssembler(inputCols=['weekday','hour','weekend','daytime','track-total-count',
                                           'track-weekday-daytime-count','last-seen-song','month','quarter',
                                           'artist-total-count','artist-weekday-daytime-count','last-seen-artist',
                                           'age','gender_int'], 
                               outputCol="features")
spark_df = vecAssembler.transform(spark_df)

In [None]:
spark_df

In [None]:
del pandas_df

In [None]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
'''
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["weekday", "hour", "weekend", "daytime", "track-total-count", 
               "track-weekday-daytime-count", "last-seen-song", "month", "quarter",
               "artist-total-count", "artist-weekday-daytime-count", "last-seen-artist", 
               "age", "gender_int"],
    outputCol="features")

transformed = assembler.transform(spark_df)
'''


In [None]:
'''
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col

transformed = (transformed.select(col("skipped").alias("label"), col("features"))
  .rdd
  .map(lambda row: LabeledPoint(row.label, row.features)))
'''


In [None]:
spark_df

In [None]:
#transformed

### Split the data into train and test

In [None]:
splits = spark_df.randomSplit([0.8, 0.2], 1234)
train = splits[0]
test = splits[1]

In [None]:
train

In [None]:
layers = [14,20,2]

In [None]:
# create the trainer and set its parameters
trainer = MultilayerPerceptronClassifier(maxIter=2, layers=layers, blockSize=128, seed=1234)

# train the model
model = trainer.fit(train)
