In [148]:
import pyspark as ps
import json
import ast
from pyspark.sql.functions import *
from pyspark.sql.types import DoubleType, FloatType
from src import DataCleaning
from src import FeatureEngineer
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor

In [149]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [150]:
spark = (ps.sql.SparkSession.builder
        .appName("sandbox")
        .getOrCreate()
        )

sc = spark.sparkContext

In [151]:
rdd = sc.textFile('data/raw_data.csv')

In [152]:
rdd.count()

35979

In [153]:
rdd_formatted = DataCleaning.clean_data(rdd)#.filter(lambda x: x[-1] != '')

In [154]:
header = ['name', 'required_age', 'windows', 'mac', 'linux', 'release_date', 'price']

In [157]:
developers = rdd_formatted.map(lambda x: x[3])
publishers = rdd_formatted.map(lambda x: x[4])
devCounts = developers.map(mkdict).reduce(combineDict)
pubCounts = publishers.map(mkdict).reduce(combineDict)

In [193]:
rdd_engineered = FeatureEngineer.feature_engineer_wrapper(rdd_formatted)

In [194]:
len(rdd_engineered.take(229)[-1])

63

In [195]:
rdd_engineered.count()

35979

In [196]:
data = np.array(rdd_engineered.filter(lambda x: x[-1] != None and len(x) == 63).collect())

In [197]:
data

array([['The Consuming Shadow', '0', '1', ..., '0', '0', '9.99'],
       ['Stacks TNT', '0', '1', ..., '0', '0', '4.99'],
       ['Sky To Fly: Faster Than Wind', '0', '1', ..., '0', '0', '3.99'],
       ...,
       ['Farmocalypse', '0', '1', ..., '0', '0', '4.99'],
       ['Fox Hime', '0', '1', ..., '0', '0', '1.99'],
       ['AtmaSphere', '0', '1', ..., '0', '0', '2.99']], dtype='<U135')

In [198]:
X = data[:, 1:-1].astype(np.float64)
y = data[:, -1].astype(np.float64)

In [205]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = .1)

In [206]:
LR = LinearRegression()
LR.fit(X_train, y_train)
preds = LR.predict(X_test)
print(np.mean((preds - y_test)**2))

90.64839107231195


In [207]:
print(preds)

[ 6.14043123  7.066921    8.1889496  ...  4.93146966  8.37464933
 12.4531406 ]


In [211]:
print(np.mean(y_test))
print(np.std(y_test))

7.39033729521362
10.137324109678588


In [209]:
RFC = RandomForestRegressor()
RFC.fit(X_train, y_train)
preds = RFC.predict(X_test)
print(np.mean((preds - y_test)**2))

114.28588780797632


In [210]:
GDB = GradientBoostingRegressor()
GDB.fit(X_train, y_train)
preds = GDB.predict(X_test)
print(np.mean((preds - y_test)**2))
print(np.mean(np.abs((preds - y_test))))

76.81422472446147
4.612750861092786
