# Initial Practice

## Import Necessary Modules

In [9]:
import numpy as np
import pyspark
import random

## Setup SparkContext

In [2]:
sc = pyspark.SparkContext()

In [3]:
data = list(range(1000))
rdd = sc.parallelize(data, 10)

rdd.getNumPartitions()

10

## Basic functions

In [4]:
rdd.count()

1000

In [5]:
rdd.first()

0

In [6]:
rdd.take(5)

[0, 1, 2, 3, 4]

In [7]:
rdd.top(8)

[999, 998, 997, 996, 995, 994, 993, 992]

In [8]:
rdd.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


## More practice

In [10]:
nums = np.array(range(1, 1001))
sales_figures = nums * np.random.rand(1000)

In [11]:
price_items = sc.parallelize(sales_figures, 10)

In [12]:
def sales_tax(num):
    return num * 0.92

In [13]:
revenue_minus_tax = price_items.map(sales_tax)

We now must perform an action for this transformation to be applied.

In [14]:
revenue_minus_tax.take(5)

[0.518921600558826,
 1.4746172827184858,
 0.04282446605029314,
 1.9776333594155198,
 3.8417745897050857]

In [17]:
discounted = price_items.map(lambda x: 0.9 * x)

In [18]:
discounted.take(5)

[0.5076406961988515,
 1.442560385268084,
 0.041893499397025906,
 1.9346413298630085,
 3.7582577507984536]

In [19]:
discounted_minus_tax = price_items.map(lambda x: 0.9 * x).map(sales_tax)

In [20]:
discounted_minus_tax.toDebugString()

b'(10) PythonRDD[9] at RDD at PythonRDD.scala:53 []\n |   ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:195 []'

In [23]:
mapped = price_items.map(lambda x: (x, x * 0.9))
print(mapped.count())
print(mapped.take(5))

1000
[(0.5640452179987239, 0.5076406961988515), (1.6028448725200932, 1.442560385268084), (0.04654833266336211, 0.041893499397025906), (2.149601477625565, 1.9346413298630085), (4.175841945331615, 3.7582577507984536)]


In [22]:
flat_mapped = price_items.flatMap(lambda x: (x, x * 0.9))
print(flat_mapped.count())
print(flat_mapped.take(5))

2000
[0.5640452179987239, 0.5076406961988515, 1.6028448725200932, 1.442560385268084, 0.04654833266336211]


In [24]:
selected_items = price_items.map(sales_tax).map(
    lambda x: x * 0.9).filter(lambda x: x >= 300)

In [26]:
selected_items.count()

285

## Reduce

In [27]:
selected_items.reduce(lambda x, y: x + y)

132041.1073177726

In [28]:
import random
random.seed(42)
# generating simulated users that have bought each item
sales_data = selected_items.map(lambda x: (random.randint(1,50),x))


In [33]:
tot_per_customer = sales_data.reduceByKey(lambda x, y: x + y)
tot_per_customer.sortBy(lambda x: x[1], ascending=False).take(5)

[(30, 6938.2977280991845),
 (39, 5239.507512528502),
 (10, 4896.857448207738),
 (45, 4741.736113762109),
 (43, 4556.175895097461)]

In [37]:
tot_items_per_customer = sales_data.countByKey()
sorted(tot_items_per_customer.items(), key=lambda k:k[1], reverse=True)

[(4, 16),
 (36, 14),
 (8, 14),
 (10, 14),
 (44, 12),
 (18, 12),
 (47, 11),
 (24, 10),
 (27, 10),
 (38, 10),
 (28, 10),
 (35, 10),
 (17, 10),
 (25, 9),
 (42, 9),
 (48, 9),
 (26, 8),
 (33, 8),
 (45, 7),
 (50, 7),
 (37, 7),
 (1, 7),
 (14, 7),
 (5, 6),
 (6, 5),
 (2, 5),
 (49, 5),
 (7, 5),
 (43, 5),
 (30, 5),
 (20, 4),
 (15, 3),
 (23, 3),
 (41, 2),
 (39, 2),
 (29, 1),
 (46, 1),
 (22, 1),
 (12, 1)]

# Machine Learning in Spark

## Import Necessary Modules

In [77]:
from pyspark import SparkContext
from pyspark.ml import feature, Pipeline
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean


## Start Spark Session

In [40]:
# sc = SparkContext()
spark = SparkSession(sc)

## Read data in

In [43]:
spark_df = spark.read.csv('forestfires.csv', inferSchema='true', header='true')

In [47]:
spark_df.select('rain')

DataFrame[rain: double]

In [48]:
spark_df['rain']

Column<b'rain'>

## Begin Aggregating

In [51]:
forest_fires_months = spark_df.groupby('month').agg({'area':'mean'})

In [52]:
forest_fires_months.collect()

[Row(month='jun', avg(area)=5.841176470588234),
 Row(month='aug', avg(area)=12.489076086956521),
 Row(month='may', avg(area)=19.24),
 Row(month='feb', avg(area)=6.275),
 Row(month='sep', avg(area)=17.942616279069753),
 Row(month='mar', avg(area)=4.356666666666667),
 Row(month='oct', avg(area)=6.638),
 Row(month='jul', avg(area)=14.3696875),
 Row(month='nov', avg(area)=0.0),
 Row(month='apr', avg(area)=8.891111111111112),
 Row(month='dec', avg(area)=13.33),
 Row(month='jan', avg(area)=0.0)]

In [55]:
no_rain = spark_df.filter(spark_df['rain'] == 0)
some_rain = spark_df.filter(spark_df['rain'] > 0)
no_rain.collect()


[Row(X=7, Y=5, month='mar', day='fri', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0),
 Row(X=7, Y=4, month='oct', day='tue', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0),
 Row(X=7, Y=4, month='oct', day='sat', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0),
 Row(X=8, Y=6, month='mar', day='sun', FFMC=89.3, DMC=51.3, DC=102.2, ISI=9.6, temp=11.4, RH=99, wind=1.8, rain=0.0, area=0.0),
 Row(X=8, Y=6, month='aug', day='sun', FFMC=92.3, DMC=85.3, DC=488.0, ISI=14.7, temp=22.2, RH=29, wind=5.4, rain=0.0, area=0.0),
 Row(X=8, Y=6, month='aug', day='mon', FFMC=92.3, DMC=88.9, DC=495.6, ISI=8.5, temp=24.1, RH=27, wind=3.1, rain=0.0, area=0.0),
 Row(X=8, Y=6, month='aug', day='mon', FFMC=91.5, DMC=145.4, DC=608.2, ISI=10.7, temp=8.0, RH=86, wind=2.2, rain=0.0, area=0.0),
 Row(X=8, Y=6, month='sep', day='tue', FFMC=91.0, DMC=129.5, DC=692.6, ISI=7.0, temp=13.1, RH=63, wind=5

In [None]:
some_rain.collect()

In [58]:
print('no rain fire area: ',no_rain.select(mean('area')).show(),'\n')
print('some rain fire area: ', some_rain.select(mean('area')).show(), '\n')

+------------------+
|         avg(area)|
+------------------+
|13.023693516699408|
+------------------+

no rain fire area:  None 

+---------+
|avg(area)|
+---------+
|  1.62375|
+---------+

some rain fire area:  None 



In [60]:
summer_months = spark_df.filter(spark_df['month'].isin('jun', 'jul', 'aug'))
winter_months = spark_df.filter(spark_df['month'].isin('dec', 'jan', 'feb'))

summer_months.select(mean('area')).show()
winter_months.select(mean('area')).show()

+------------------+
|         avg(area)|
+------------------+
|12.262317596566525|
+------------------+

+-----------------+
|        avg(area)|
+-----------------+
|7.918387096774193|
+-----------------+



## Machine Learning

In [62]:
fire_df = spark_df.drop('day')
fire_df.head()

Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0)

In [64]:
si = StringIndexer(inputCol='month', outputCol='month_num')
model = si.fit(fire_df)
new_data = model.transform(fire_df)

In [65]:
new_data.head(3)

[Row(X=7, Y=5, month='mar', FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0),
 Row(X=7, Y=4, month='oct', FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0)]

In [66]:
new_data = new_data.drop('month')

In [67]:
new_data.select('month_num').distinct().collect()

[Row(month_num=8.0),
 Row(month_num=0.0),
 Row(month_num=7.0),
 Row(month_num=1.0),
 Row(month_num=4.0),
 Row(month_num=11.0),
 Row(month_num=3.0),
 Row(month_num=2.0),
 Row(month_num=10.0),
 Row(month_num=6.0),
 Row(month_num=5.0),
 Row(month_num=9.0)]

In [68]:
ohe = feature.OneHotEncoderEstimator(inputCols=['month_num'], outputCols=['month_vec'], dropLast=True)
one_hot_encoded = ohe.fit(new_data).transform(new_data)
one_hot_encoded.head(3)

[Row(X=7, Y=5, FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0})),
 Row(X=7, Y=4, FFMC=90.6, DMC=35.4, DC=669.1, ISI=6.7, temp=18.0, RH=33, wind=0.9, rain=0.0, area=0.0, month_num=6.0, month_vec=SparseVector(11, {6: 1.0})),
 Row(X=7, Y=4, FFMC=90.6, DMC=43.7, DC=686.9, ISI=6.7, temp=14.6, RH=33, wind=1.3, rain=0.0, area=0.0, month_num=6.0, month_vec=SparseVector(11, {6: 1.0}))]

In [69]:
features = ['X',
 'Y',
 'FFMC',
 'DMC',
 'DC',
 'ISI',
 'temp',
 'RH',
 'wind',
 'rain',
 'month_vec']

target = 'area'

vector = VectorAssembler(inputCols=features,outputCol='features')
vectorized_df = vector.transform(one_hot_encoded)

In [70]:
vectorized_df.head()

Row(X=7, Y=5, FFMC=86.2, DMC=26.2, DC=94.3, ISI=5.1, temp=8.2, RH=51, wind=6.7, rain=0.0, area=0.0, month_num=2.0, month_vec=SparseVector(11, {2: 1.0}), features=SparseVector(21, {0: 7.0, 1: 5.0, 2: 86.2, 3: 26.2, 4: 94.3, 5: 5.1, 6: 8.2, 7: 51.0, 8: 6.7, 12: 1.0}))

In [71]:
rf_model = RandomForestRegressor(featuresCol='features', 
                                 labelCol='area', 
                                 predictionCol="prediction").fit(vectorized_df)

In [72]:
rf_model.featureImportances

SparseVector(21, {0: 0.1161, 1: 0.0616, 2: 0.1403, 3: 0.143, 4: 0.1272, 5: 0.0878, 6: 0.1341, 7: 0.0581, 8: 0.1061, 9: 0.0, 10: 0.0014, 11: 0.0171, 13: 0.001, 14: 0.0001, 15: 0.0038, 17: 0.0002, 18: 0.0005, 20: 0.0016})

In [73]:
predictions = rf_model.transform(vectorized_df).select("area","prediction")
predictions.head(10)

[Row(area=0.0, prediction=6.805549968733098),
 Row(area=0.0, prediction=6.742433557276543),
 Row(area=0.0, prediction=6.724252996784247),
 Row(area=0.0, prediction=11.183570645996133),
 Row(area=0.0, prediction=7.23779339611102),
 Row(area=0.0, prediction=6.1644429971281465),
 Row(area=0.0, prediction=12.489110962609923),
 Row(area=0.0, prediction=9.072084028201763),
 Row(area=0.0, prediction=6.309335863667351),
 Row(area=0.0, prediction=6.1077676023888205)]

In [75]:
evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area')

In [76]:
evaluator.evaluate(predictions,{evaluator.metricName:"r2"})
evaluator.evaluate(predictions,{evaluator.metricName:"mae"})

13.727340902126976

In [79]:
## instantiating all necessary estimator objects

string_indexer = StringIndexer(inputCol='month',outputCol='month_num',handleInvalid='keep')
one_hot_encoder = feature.OneHotEncoderEstimator(inputCols=['month_num'],outputCols=['month_vec'],dropLast=True)
vector_assember = VectorAssembler(inputCols=features,outputCol='features')
random_forest = RandomForestRegressor(featuresCol='features',labelCol='area')
stages =  [string_indexer, one_hot_encoder, vector_assember,random_forest]

# instantiating the pipeline with all them estimator objects
pipeline = Pipeline(stages=stages)

In [80]:
# creating parameter grid

params = ParamGridBuilder()\
.addGrid(random_forest.maxDepth, [5,10,15])\
.addGrid(random_forest.numTrees, [20,50,100])\
.build()

In [81]:
print('total combinations of parameters: ',len(params))

params[0]

total combinations of parameters:  9


{Param(parent='RandomForestRegressor_ab18f8a84198', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
 Param(parent='RandomForestRegressor_ab18f8a84198', name='numTrees', doc='Number of trees to train (>= 1).'): 20}

In [82]:
## instantiating the evaluator by which we will measure our model's performance
reg_evaluator = RegressionEvaluator(predictionCol='prediction', labelCol='area',metricName = 'mae')
## instantiating crossvalidator estimator
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=params,evaluator=reg_evaluator,parallelism=4)

In [83]:
## fitting crossvalidator
cross_validated_model = cv.fit(fire_df)

In [84]:
cross_validated_model.avgMetrics

[22.11781158673451,
 22.473010219521708,
 22.33571445602858,
 22.551966124221224,
 23.184243647684568,
 23.047228120819433,
 22.648341241756718,
 23.274416622066546,
 23.166367799081776]

In [85]:
predictions = cross_validated_model.transform(spark_df)
predictions.select('prediction','area').show(300)

+------------------+-------+
|        prediction|   area|
+------------------+-------+
| 5.295127864822371|    0.0|
| 4.481399497852698|    0.0|
| 5.202937074626726|    0.0|
| 9.683022137388097|    0.0|
| 3.612564374951808|    0.0|
| 6.621861597214272|    0.0|
| 5.111384085543784|    0.0|
|11.293654094704825|    0.0|
| 6.808791631446708|    0.0|
| 7.229161372570414|    0.0|
|5.8924626042046535|    0.0|
| 6.417551908402662|    0.0|
|  9.33048382149785|    0.0|
|  8.84572125569065|    0.0|
| 16.52824603326646|    0.0|
| 6.735667462246573|    0.0|
|  5.33865685123856|    0.0|
| 8.487640610690004|    0.0|
| 4.360220705042755|    0.0|
| 4.888585519912934|    0.0|
|12.103819127771807|    0.0|
| 6.673590563565362|    0.0|
| 8.583734868235103|    0.0|
| 9.667986310363805|    0.0|
| 6.252920169787965|    0.0|
| 6.882734781596405|    0.0|
| 6.225445215215434|    0.0|
| 6.686718646420644|    0.0|
| 5.071878813744122|    0.0|
|10.588447167232795|    0.0|
| 60.93547845771352|    0.0|
| 6.2940596164

In [86]:
type(cross_validated_model.bestModel)

pyspark.ml.pipeline.PipelineModel

In [87]:
cross_validated_model.bestModel.stages

[StringIndexer_9d2e4b523fa3,
 OneHotEncoderEstimator_74f6d023a5fc,
 VectorAssembler_cbd00d857a56,
 RandomForestRegressionModel (uid=RandomForestRegressor_ab18f8a84198) with 20 trees]

In [88]:
optimal_rf_model = cross_validated_model.bestModel.stages[3]

In [89]:
optimal_rf_model.featureImportances

SparseVector(22, {0: 0.0577, 1: 0.0303, 2: 0.1391, 3: 0.035, 4: 0.1544, 5: 0.0874, 6: 0.1081, 7: 0.0765, 8: 0.1927, 10: 0.0075, 11: 0.0611, 12: 0.0001, 13: 0.0477, 14: 0.0004, 15: 0.0012, 16: 0.0001, 17: 0.0006, 18: 0.0001, 20: 0.0001})

In [90]:
optimal_rf_model.getNumTrees

20