# PySpark DataFrames

In [1]:
from pyspark import SparkContext
sc = SparkContext()

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Spark Regression Example").config("spark.some.config.option", "some-value").getOrCreate()

In [3]:
regressionDataFrame = spark.read.csv("Advertising.csv", header=True, inferSchema=True)

In [4]:
regressionDataFrame.show(5)

+---+-----+-----+---------+-----+
|_c0|   TV|radio|newspaper|sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
+---+-----+-----+---------+-----+
only showing top 5 rows



In [5]:
regressionDataFrame = regressionDataFrame.drop("_c0")

In [6]:
regressionDataFrame.show(5)

+-----+-----+---------+-----+
|   TV|radio|newspaper|sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
| 44.5| 39.3|     45.1| 10.4|
| 17.2| 45.9|     69.3|  9.3|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
+-----+-----+---------+-----+
only showing top 5 rows



In [7]:
regressionDataFrame.columns

['TV', 'radio', 'newspaper', 'sales']

In [8]:
regressionDataFrame.groupBy(regressionDataFrame.TV > 100).count().show(5)

+----------+-----+
|(TV > 100)|count|
+----------+-----+
|      true|  130|
|     false|   70|
+----------+-----+



In [9]:
regressionDataFrame.count()

200

In [10]:
regressionDataFrame.filter(regressionDataFrame.TV > 100).show(5)

+-----+-----+---------+-----+
|   TV|radio|newspaper|sales|
+-----+-----+---------+-----+
|230.1| 37.8|     69.2| 22.1|
|151.5| 41.3|     58.5| 18.5|
|180.8| 10.8|     58.4| 12.9|
|120.2| 19.6|     11.6| 13.2|
|199.8|  2.6|     21.2| 10.6|
+-----+-----+---------+-----+
only showing top 5 rows



In [11]:
regressionDataFrame.select(regressionDataFrame.TV > 100).show(5)

+----------+
|(TV > 100)|
+----------+
|      true|
|     false|
|     false|
|      true|
|      true|
+----------+
only showing top 5 rows



In [12]:
regressionDataFrame.describe().show()

+-------+-----------------+------------------+------------------+------------------+
|summary|               TV|             radio|         newspaper|             sales|
+-------+-----------------+------------------+------------------+------------------+
|  count|              200|               200|               200|               200|
|   mean|         147.0425|23.264000000000024|30.553999999999995|14.022500000000003|
| stddev|85.85423631490805|14.846809176168728| 21.77862083852283| 5.217456565710477|
|    min|              0.7|               0.0|               0.3|               1.6|
|    max|            296.4|              49.6|             114.0|              27.0|
+-------+-----------------+------------------+------------------+------------------+



In [13]:
from pyspark.sql.functions import mean
from pyspark.sql.functions import min as psp_min
from pyspark.sql.functions import max as psp_max

regressionDataFrame.select([mean("TV"), psp_min("TV"), psp_max("TV")])

DataFrame[avg(TV): double, min(TV): double, max(TV): double]

In [14]:
titanicDF = spark.read.csv("titanic.csv", header=True, inferSchema=True)

In [15]:
LOGICAL_AND_STATEMENT = (titanicDF.Fare > 100) & (titanicDF.Survived == 1)

titanicDF.groupBy(LOGICAL_AND_STATEMENT).count().show()

+---------------------------------+-----+
|((Fare > 100) AND (Survived = 1))|count|
+---------------------------------+-----+
|                             true|   39|
|                            false|  852|
+---------------------------------+-----+



In [16]:
LOGICAL_PARAM_1 = titanicDF.Fare > 100
LOGICAL_PARAM_2 = titanicDF.Survived == 1

titanicDF.groupBy(LOGICAL_PARAM_1, LOGICAL_PARAM_2).count().show()

+------------+--------------+-----+
|(Fare > 100)|(Survived = 1)|count|
+------------+--------------+-----+
|        true|         false|   14|
|        true|          true|   39|
|       false|         false|  535|
|       false|          true|  303|
+------------+--------------+-----+



In [29]:
titanicDF.groupBy(LOGICAL_PARAM_1, LOGICAL_PARAM_2).count().show()

+------------+--------------+-----+
|(Fare > 100)|(Survived = 1)|count|
+------------+--------------+-----+
|        true|         false|   14|
|        true|          true|   39|
|       false|         false|  535|
|       false|          true|  303|
+------------+--------------+-----+



In [18]:
from time import time as t

def hello_world():
    t0 = t()
    import sklearn
    from sklearn.datasets import load_boston
    print("hello world")
    t1 = t()
    print("time elapsed: {:.6f}".format(t1 - t0))

In [19]:
hello_world()

hello world
time elapsed: 2.844492


## Linear Regression with PySpark

In [45]:
from pyspark.mllib.regression import LinearRegressionWithSGD
from pyspark.mllib.regression import LabeledPoint as LPoint

In [34]:
regressionDataRDD = regressionDataFrame.rdd.map(list)

In [35]:
regressionDataRDD.take(5)

[[230.1, 37.8, 69.2, 22.1],
 [44.5, 39.3, 45.1, 10.4],
 [17.2, 45.9, 69.3, 9.3],
 [151.5, 41.3, 58.5, 18.5],
 [180.8, 10.8, 58.4, 12.9]]

In [37]:
regressionDataLabeledPoints = regressionDataRDD.map(lambda data: LPoint(data[3], data[0:3]))

In [38]:
regressionDataLabeledPoints.take(5)

[LabeledPoint(22.1, [230.1,37.8,69.2]),
 LabeledPoint(10.4, [44.5,39.3,45.1]),
 LabeledPoint(9.3, [17.2,45.9,69.3]),
 LabeledPoint(18.5, [151.5,41.3,58.5]),
 LabeledPoint(12.9, [180.8,10.8,58.4])]

In [39]:
regressionLabeledDataSplit = regressionDataLabeledPoints.randomSplit([0.7, 0.3])

In [41]:
regressionLabeledDataSplitTrainData, regressionLabeledDataSplitTestData = regressionLabeledDataSplit[0], regressionLabeledDataSplit[1]

In [42]:
regressionLabeledDataSplitTrainData.take(5)

[LabeledPoint(10.4, [44.5,39.3,45.1]),
 LabeledPoint(9.3, [17.2,45.9,69.3]),
 LabeledPoint(18.5, [151.5,41.3,58.5]),
 LabeledPoint(12.9, [180.8,10.8,58.4]),
 LabeledPoint(11.8, [57.5,32.8,23.5])]

In [43]:
regressionLabeledDataSplitTestData.take(5)

[LabeledPoint(22.1, [230.1,37.8,69.2]),
 LabeledPoint(7.2, [8.7,48.9,75.0]),
 LabeledPoint(4.8, [8.6,2.1,1.0]),
 LabeledPoint(8.6, [66.1,5.8,24.2]),
 LabeledPoint(17.4, [214.7,24.0,4.0])]

In [84]:
def test_pyspark_LinReg(ITERATION, STEP):
    LinRegModel = LinearRegressionWithSGD.train(data = regressionLabeledDataSplitTrainData,
                                            iterations = ITERATION,
                                            step = STEP,
                                            intercept = True)
#     return print("For Iteration #{} and Step Size {}, the y-Intercept is {}".format(ITERATION, STEP, LinRegModel.intercept))
    return LinRegModel.intercept

In [86]:
ALL_ITERATIONS = [100, 200, 300]
ALL_STEPS = [0.1, 0.02, 0.01, 0.001, 0.0001, 0.00001, 0.000001]
ALL_INTERCEPTS = list()

for ITERATION in ALL_ITERATIONS:
    for STEP in ALL_STEPS:
        ALL_INTERCEPTS.append(test_pyspark_LinReg(ITERATION, STEP))

For Iteration #100 and Step Size 0.1, the y-Intercept is -2.0051520000778176e+263
For Iteration #100 and Step Size 0.02, the y-Intercept is -9.899990180489119e+192
For Iteration #100 and Step Size 0.01, the y-Intercept is -2.369054898460538e+162
For Iteration #100 and Step Size 0.001, the y-Intercept is -3.9533446750905946e+51
For Iteration #100 and Step Size 0.0001, the y-Intercept is 1.0013219428772064
For Iteration #100 and Step Size 1e-05, the y-Intercept is 1.0004123211637506
For Iteration #100 and Step Size 1e-06, the y-Intercept is 1.0000389205251445
For Iteration #200 and Step Size 0.1, the y-Intercept is nan
For Iteration #200 and Step Size 0.02, the y-Intercept is nan
For Iteration #200 and Step Size 0.01, the y-Intercept is -6.706428168883327e+297
For Iteration #200 and Step Size 0.001, the y-Intercept is -5.871989922140454e+64
For Iteration #200 and Step Size 0.0001, the y-Intercept is 1.0013219428772064
For Iteration #200 and Step Size 1e-05, the y-Intercept is 1.000412321

In [81]:
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score
from sklearn.model_selection import train_test_split

df = pd.read_csv("Advertising.csv")
feature_cols = ["TV", "radio", "newspaper"]

# Use the top 150 companies to train the Linear Regression Model
# X_train = df[feature_cols][:150]
# Y_train = df.sales[:150]
X, y = df[feature_cols], df.sales
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.75, test_size=0.25)

# Instansiate the model (Linear Regression) and train it
print("Generating a linear regression on the top 150 companies (first one)")
sales_reg = LinearRegression()
sales_reg.fit(X_train, y_train)

print("Printing the coefficient and y intercept of our first 150 sales")
print(sales_reg.coef_)
print(sales_reg.intercept_)

Generating a linear regression on the top 150 companies (first one)
Printing the coefficient and y intercept of our first 150 sales
[0.04463628 0.18917929 0.00288182]
2.9988647690848858
