In [1]:
import findspark
findspark.init()
import pyspark
import pandas as pd
from datetime import datetime, date

# Spark SQL

In [4]:
from pyspark.sql import Row, SparkSession, SQLContext
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [3]:
# from a list of rows
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
],  schema='a long, b double, c string, d date, e timestamp')
print(df)

# from pandas dataframe
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
print(df)

# from a RDD
rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
print(df)


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]
DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]


In [4]:
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [5]:
# when data volume goes up, we could change the setting to make the dataframe clearer
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [6]:
# wrangling with rdd
list_p = [('John',19),('Smith',29),('Adam',35),('Henry',50)]
rdd = spark.sparkContext.parallelize(list_p)
ppl = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
DF_ppl = spark.createDataFrame(ppl)

In [46]:
from pyspark import SparkFiles
url = "https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema= True)
df.show(5, truncate=False)

+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|x  |age|workclass|fnlwgt|education   |educational-num|marital-status    |occupation       |relationship|race |gender|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+---+---------+------+------------+---------------+------------------+-----------------+------------+-----+------+------------+------------+--------------+--------------+------+
|1  |25 |Private  |226802|11th        |7              |Never-married     |Machine-op-inspct|Own-child   |Black|Male  |0           |0           |40            |United-States |<=50K |
|2  |38 |Private  |89814 |HS-grad     |9              |Married-civ-spouse|Farming-fishing  |Husband     |White|Male  |0           |0           |50            |United-States |<=50K |
|3  |28 |Local-gov|336951|Assoc-acdm  |12             |Married-civ-spouse|Protective-serv 

In [10]:
# # Data Wrangling
# df.select('age','fnlwgt').show(5)
# df.filter(df.age > 40).count()
# df.groupby("education").count().sort("count", ascending=True).show()
# df.groupby('marital-status').agg({'capital-gain': 'mean'}).show()
df.crosstab('age', 'workclass').sort("age_workclass").show()

+-------------+---+-----------+---------+------------+-------+------------+----------------+---------+-----------+
|age_workclass|  ?|Federal-gov|Local-gov|Never-worked|Private|Self-emp-inc|Self-emp-not-inc|State-gov|Without-pay|
+-------------+---+-----------+---------+------------+-------+------------+----------------+---------+-----------+
|           17| 97|          2|       21|           2|    454|           8|               9|        2|          0|
|           18|154|          5|       15|           4|    638|          12|              20|       14|          0|
|           19|183|          6|       18|           0|    784|           6|              24|       29|          3|
|           20|184|         13|       20|           2|    834|          11|              16|       33|          0|
|           21|147|          4|       22|           0|    859|           4|              15|       44|          1|
|           22|128|         16|       34|           0|    924|           9|     

In [11]:
# the exciting part! you can use dataframe wrangling and SQL at the same time
df.createOrReplaceTempView("tableA")
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|   48842|
+--------+



# Spark MLlib

In [16]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

In [14]:
# Load in the data
rdd = sc.textFile('./cal_housing.data')

# Load in the header
header = sc.textFile('./cal_housing.domain')

# Split lines on commas
rdd = rdd.map(lambda line: line.split(","))

In [17]:
# Import the necessary modules 
from pyspark.sql import Row

# Map the RDD to a DF
df = rdd.map(lambda line: Row(longitude=line[0], 
                              latitude=line[1], 
                              housingMedianAge=line[2],
                              totalRooms=line[3],
                              totalBedRooms=line[4],
                              population=line[5], 
                              households=line[6],
                              medianIncome=line[7],
                              medianHouseValue=line[8])).toDF()

In [27]:
def convertColumn(df, names, newType):
  for name in names: 
     df = df.withColumn(name, df[name].cast(newType))
  return df 

# Assign all column names to `columns`
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']

# Conver the `df` columns to `FloatType()`
df = convertColumn(df, columns, FloatType())

In [28]:
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)

[Row(longitude=-122.2300033569336, latitude=37.880001068115234, housingMedianAge=41.0, totalRooms=880.0, totalBedRooms=129.0, population=322.0, households=126.0, medianIncome=8.325200080871582, medianHouseValue=452600.0),
 Row(longitude=-122.22000122070312, latitude=37.86000061035156, housingMedianAge=21.0, totalRooms=7099.0, totalBedRooms=1106.0, population=2401.0, households=1138.0, medianIncome=8.301400184631348, medianHouseValue=358500.0)]

In [30]:
# Import all from `sql.functions` if you haven't yet
from pyspark.sql.functions import *

# Divide `totalRooms` by `households`
roomsPerHousehold = df.select(col("totalRooms")/col("households"))

# Divide `population` by `households`
populationPerHousehold = df.select(col("population")/col("households"))

# Divide `totalBedRooms` by `totalRooms`
bedroomsPerRoom = df.select(col("totalBedRooms")/col("totalRooms"))

# Add the new columns to `df`
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households")) \
   .withColumn("populationPerHousehold", col("population")/col("households")) \
   .withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))

In [31]:
# Re-order and select columns
df = df.select("medianHouseValue", 
              "totalBedRooms", 
              "population", 
              "households", 
              "medianIncome", 
              "roomsPerHousehold", 
              "populationPerHousehold", 
              "bedroomsPerRoom")

In [32]:
# Import `DenseVector`
from pyspark.ml.linalg import DenseVector

# Define the `input_data` 
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

# Replace `df` with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

In [42]:
input_data.take(1)

AttributeError: 'NoneType' object has no attribute 'sc'

In [33]:
# Import `StandardScaler` 
from pyspark.ml.feature import StandardScaler

# Initialize the `standardScaler`
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

# Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

# Transform the data in `df` with the scaler
scaled_df = scaler.transform(df)

In [35]:
# Split the data into train and test sets
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=1234)

# Import `LinearRegression`
from pyspark.ml.regression import LinearRegression

# Initialize `lr`
lr = LinearRegression(labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the data to the model
linearModel = lr.fit(train_data)


In [36]:
# Generate predictions
predicted = linearModel.transform(test_data)

# Extract the predictions and the "known" correct labels
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])

# Zip `predictions` and `labels` into a list
predictionAndLabel = predictions.zip(labels).collect()

# Print out first 5 instances of `predictionAndLabel` 
predictionAndLabel[:5]

[(203340.32821388176, 14999.0),
 (159798.89517170648, 22500.0),
 (91571.76463829097, 38800.0),
 (75699.07270083236, 39400.0),
 (115646.38626903069, 39600.0)]

In [37]:
# Coefficients for the model
linearModel.coefficients

# Intercept for the model
linearModel.intercept

-98869.66177180322

In [38]:
# Get the RMSE
linearModel.summary.rootMeanSquaredError

# Get the R2
linearModel.summary.r2

0.5465738375168637

In [40]:
spark.stop()