In [1]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import scale
import pandas as pd
from helper import *
import tensorflow as tf

pd.options.display.max_columns = None
import findspark
import pyspark
from pyspark.sql import SQLContext
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import col


## Import DataSet

In [2]:
sumdata_url = "https://www.dropbox.com/sh/euppz607r6gsen2/AABABUTdx7YqCeBquA1Ky7z8a/The%20SUM%20dataset?dl=1#"
housing_price_url = "https://www.dropbox.com/sh/euppz607r6gsen2/AAAVLZzU4E7ro0BiRzPG3pP8a/House%20Prices?dl=1"
all_urls = [sumdata_url, housing_price_url]

In [3]:
get_data(all_urls) # retrieves the data if there is no data folder

In [4]:
sumdata_noise_path = "data/with noise/The SUM dataset, with noise.csv"
sumdata_path = "data/without noise/The SUM dataset, without noise.csv"
housing_price_path = "data/housing dataset.csv" # has more than 30 features
# need one more
# what a brilliant idea to name files with space

In [5]:
data_chunks = [100, 500, 1000, 5000, 10000, 50000, 100000, 500000,
1000000, 5000000, 10000000, 50000000, 100000000]

## Load datasets sum_noise

In [6]:
sumdata_noise = pd.read_csv(sumdata_noise_path, delimiter=";")
sumdata_noise.head(n=1)

Unnamed: 0,Instance,Feature 1,Feature 2,Feature 3,Feature 4,Feature 5 (meaningless),Feature 6,Feature 7,Feature 8,Feature 9,Feature 10,Noisy Target,Noisy Target Class
0,1,66957,74432,96087,103120,64272,150633,181787,180349,216912,304071,1434819,Very Large Number


## Preprocess sum_noise dataset

- Remove 'Instance' as it simply represents the row number
- Extract 'Nosiy Target' as regression target
- Extract 'Nosiy Class' as classification target
- Extract rest columns as explananatory variables
- Apply Feature Scaling to the dataset 

- Ensure all dataframe has been converted to numpy array


In [7]:
# Remove 'Instance' as it simply represents the row number
sumdata_noise.drop('Instance', axis = 1)

# Extract 'Nosiy Target' as regression target
sumdata_noise_reg_Y = sumdata_noise['Noisy Target']

sumdata_noise_reg_Y = scale(sumdata_noise_reg_Y.values.reshape(sumdata_noise_reg_Y.shape[0],1))
# Extract 'Nosiy Target Class' as regression target
sumdata_noise_classif_Y = sumdata_noise['Noisy Target Class']

# Extract rest columns as explananatory variables
sumdata_noise_X = scale(sumdata_noise.iloc[:, 1:-2].as_matrix()) 

 



In [8]:
sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)


In [9]:
# converting pandas -> spark dataframe
 

data_df = sqlContext.createDataFrame(sumdata_noise.drop('Noisy Target Class', axis=1))

In [10]:
input_features = list(sumdata_noise.columns)[:-2]
ouput_label = list(sumdata_noise.columns)[-2]

In [11]:
# spark needs the dataframe to be labeled with "features" and "labels"
assembler = VectorAssembler(
    inputCols=input_features,
    outputCol="features")
transformed = assembler.transform(data_df)

In [12]:
from pyspark.mllib import linalg as mllib_linalg
from pyspark.ml import linalg as ml_linalg
from pyspark.ml.linalg import Vectors

def as_old(v):
    if isinstance(v, ml_linalg.SparseVector):
        return mllib_linalg.SparseVector(v.size, v.indices, v.values)
    if isinstance(v, ml_linalg.DenseVector):
        return  Vectors.sparse(v.values)
    raise ValueError("Unsupported type {0}".format(type(v)))

In [None]:
data= (transformed.select(col(ouput_label).alias("label"), col("features"))
  .rdd
  .map(lambda row: LabeledPoint(row.label, as_old(row.features)))).toDF()

In [24]:
features_names =  list(sumdata_noise.columns)[:-1]
df = data_df.select(features_names).rdd
print(df.take(5))
temp = df.map(lambda line:LabeledPoint(line[-1],[line[1:-1]]))
temp.take(5)

[Row(Instance=1, Feature 1=66957, Feature 2=74432, Feature 3=96087, Feature 4=103120, Feature 5 (meaningless)=64272, Feature 6=150633, Feature 7=181787, Feature 8=180349, Feature 9=216912, Feature 10=304071, Noisy Target=1434819), Row(Instance=2, Feature 1=96030, Feature 2=86875, Feature 3=108299, Feature 4=148025, Feature 5 (meaningless)=16965, Feature 6=253819, Feature 7=258672, Feature 8=268851, Feature 9=404599, Feature 10=543092, Noisy Target=2148748), Row(Instance=3, Feature 1=26212, Feature 2=23398, Feature 3=27668, Feature 4=39678, Feature 5 (meaningless)=23062, Feature 6=65873, Feature 7=65660, Feature 8=68508, Feature 9=82617, Feature 10=115418, Noisy Target=476405), Row(Instance=4, Feature 1=28363, Feature 2=33381, Feature 3=42447, Feature 4=35270, Feature 5 (meaningless)=8980, Feature 6=52885, Feature 7=79144, Feature 8=85741, Feature 9=86806, Feature 10=147368, Noisy Target=635169), Row(Instance=5, Feature 1=38960, Feature 2=50255, Feature 3=79879, Feature 4=91885, Feature

[LabeledPoint(1434819.0, [66957.0,74432.0,96087.0,103120.0,64272.0,150633.0,181787.0,180349.0,216912.0,304071.0]),
 LabeledPoint(2148748.0, [96030.0,86875.0,108299.0,148025.0,16965.0,253819.0,258672.0,268851.0,404599.0,543092.0]),
 LabeledPoint(476405.0, [26212.0,23398.0,27668.0,39678.0,23062.0,65873.0,65660.0,68508.0,82617.0,115418.0]),
 LabeledPoint(635169.0, [28363.0,33381.0,42447.0,35270.0,8980.0,52885.0,79144.0,85741.0,86806.0,147368.0]),
 LabeledPoint(1221471.0, [38960.0,50255.0,79879.0,91885.0,64037.0,127193.0,115760.0,174069.0,184805.0,250659.0])]

In [25]:

from pyspark.mllib.util import MLUtils
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler

In [26]:
features = df.map(lambda row: row[1:])

In [41]:

standardizer = StandardScaler()
model = standardizer.fit(features)
features_transform = model.transform(features)



TypeError: not supported type: <class 'numpy.ndarray'>

In [28]:
lab = df.map(lambda row: row[0])
transformedData = lab.zip(features_transform)

In [29]:
transformedData = transformedData.map(lambda row: LabeledPoint(row[0],[row[1]]))

In [30]:

trainingData, testingData = transformedData.randomSplit([.8,.2],seed=1234)

In [32]:
from pyspark.mllib.regression import LinearRegressionWithSGD
linearModel = LinearRegressionWithSGD.train(trainingData,1000,.2)



In [33]:
linearModel.weights

DenseVector([13002.3065, 12663.2027, 12956.2243, 12748.2522, 117847.7154, 12747.9969, 12574.0738, 12585.0482, 12579.4027, 12460.0509, 12275.0822])

In [34]:

from pyspark.mllib.evaluation import RegressionMetrics
prediObserRDDin = trainingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))

In [35]:
metrics = RegressionMetrics(prediObserRDDin)

In [36]:
metrics.r2


-0.438163379331318

In [37]:

prediObserRDDout = testingData.map(lambda row: (float(linearModel.predict(row.features[0])),row.label))
metrics = RegressionMetrics(prediObserRDDout)

In [38]:

metrics.rootMeanSquaredError

335877.77249449247

##### http://www.techpoweredmath.com/spark-dataframes-mllib-tutorial/

In [40]:
features_transform.toDF()

TypeError: not supported type: <class 'numpy.ndarray'>