In [1]:
!pip install pyspark



In [2]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .master ('local[*]')\
        .getOrCreate()
sc = spark.sparkContext

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
%cd /content/drive/My Drive/BigDataColab25FallShalRitvikSinha
!ls

/content/drive/My Drive/BigDataColab25FallShalRitvikSinha
 10.Spark-ClassificationShalRitvikSinha.ipynb
'6. MR-WordCountReducer_Shal_Ritvik_Sinha.ipynb'
 7.Spark-WordCountShal_Ritvik_Sinha.ipynb
 8.Spark-SQLShalRitvikSinha.ipynb
 8.Spark-StreamingShalRitvikSinha.ipynb
 9.Spark-Handling-missing-values.ShalRitvikSinha.ipynb
 ad-clicks.csv.gz
 Alice.txt
 BigDataShalRitvikSinhaTest1.ipynb
 BigDataShalRitvikSinhaTest2.ipynb
 buy-clicks.csv.gz
 Cheshire
 daily_weather.csv
 game-clicks.csv.gz
 hadoop-3.3.6
 hadoop-3.3.6.tar.gz
 hadoop-3.3.6.tar.gz.1
 join1_FileA.txt
 join1_FileB.txt
 join1_mapper.py
 join1_reducer.py
 join2_genchanA.txt
 join2_genchanB.txt
 join2_genchanc.txt
 join2_gennumA.txt
 join2_gennumB.txt
 join2_gennumC.txt
 join2_mapper.py
 join2_reducer.py
 make_join2data.py
 MT
 MT1_B
 MT2
 MT3
 MT_Q1_make_data.py
 MT_Q2_make_data.py
 MT_Q3_make_data.py
 out0
 out_4Data_Join
 out_4WC
 outDataJoin2
 out_Quiz3
 out_wordmedian
 Shal_Ritvik_Sinha_Mid-term.ipynb
'Shal Ritvik Sinha_MT_Q1

In [6]:
from pyspark.sql import SQLContext
from pyspark.sql import DataFrameNaFunctions
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.feature import Binarizer
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer

In [8]:
sqlContext = SQLContext(sc)
df = sqlContext.read.load('/content/drive/My Drive/BigDataColab25FallShalRitvikSinha/daily_weather.csv',format='com.databricks.spark.csv',header='true', inferSchema='true' )

df.columns



['number',
 'air_pressure_9am',
 'air_temp_9am',
 'avg_wind_direction_9am',
 'avg_wind_speed_9am',
 'max_wind_direction_9am',
 'max_wind_speed_9am',
 'rain_accumulation_9am',
 'rain_duration_9am',
 'relative_humidity_9am',
 'relative_humidity_3pm']

In [9]:
featureColumns = ['air_pressure_9am' , 'air_temp_9am', 'avg_wind_direction_9am' , 'avg_wind_speed_9am',
'max_wind_direction_9am' , 'max_wind_speed_9am' , 'rain_accumulation_9am','rain_duration_9am']

In [10]:
df = df.drop('number')

In [11]:
df =df.na.drop()

In [12]:
df.count(), len(df.columns)

(1064, 10)

In [13]:
binarizer = Binarizer(threshold=24.99999, inputCol="relative_humidity_3pm", outputCol="label")
binarizedDF = binarizer.transform(df)

In [14]:
binarizedDF.select("relative_humidity_3pm","label").show(4)

+---------------------+-----+
|relative_humidity_3pm|label|
+---------------------+-----+
|   36.160000000000494|  1.0|
|     19.4265967985621|  0.0|
|   14.460000000000045|  0.0|
|   12.742547353761848|  0.0|
+---------------------+-----+
only showing top 4 rows



In [15]:
assembler = VectorAssembler(inputCols=featureColumns, outputCol="features")
assembled = assembler.transform(binarizedDF)

In [16]:
(trainingData, testData) = assembled.randomSplit([0.8,0.2], seed = 13234 )

In [17]:
trainingData.count(), testData.count()

(846, 218)

In [19]:
dt = DecisionTreeClassifier(labelCol="label", featuresCol="features", maxDepth=5,
minInstancesPerNode=20, impurity="gini")

In [24]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[dt])
model = pipeline.fit(trainingData)

In [25]:
predictions =model.transform(testData)

In [26]:
predictions.select("prediction", "label").show(10)

+----------+-----+
|prediction|label|
+----------+-----+
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       1.0|  1.0|
|       0.0|  0.0|
|       1.0|  1.0|
+----------+-----+
only showing top 10 rows



In [28]:
predictions.select("prediction", "label").write.save(path='/content/drive/My Drive/BigDataColab25FallShalRitvikSinha/predictions.csv', format='com.databricks.spark.csv',
header='true' )