<a href="https://colab.research.google.com/github/WanjohiChristopher/Apache-Pyspark/blob/main/Pyspark_DE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# install py4j
!pip install py4j

# install java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)

!wget -q https://archive.apache.org/dist/spark/spark-3.0.2/spark-3.0.2-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.2-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.2-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

# Install pyspark
!pip install pyspark

Collecting py4j
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[?25l[K     |█▋                              | 10 kB 32.3 MB/s eta 0:00:01[K     |███▎                            | 20 kB 20.7 MB/s eta 0:00:01[K     |█████                           | 30 kB 16.3 MB/s eta 0:00:01[K     |██████▋                         | 40 kB 14.6 MB/s eta 0:00:01[K     |████████▎                       | 51 kB 16.2 MB/s eta 0:00:01[K     |█████████▉                      | 61 kB 12.9 MB/s eta 0:00:01[K     |███████████▌                    | 71 kB 14.4 MB/s eta 0:00:01[K     |█████████████▏                  | 81 kB 15.9 MB/s eta 0:00:01[K     |██████████████▉                 | 92 kB 16.8 MB/s eta 0:00:01[K     |████████████████▌               | 102 kB 15.2 MB/s eta 0:00:01[K     |██████████████████▏             | 112 kB 15.2 MB/s eta 0:00:01[K     |███████████████████▊            | 122 kB 15.2 MB/s eta 0:00:01[K     |█████████████████████▍          | 133 kB 15.2 MB/s eta 

In [None]:
# findspark will locate spark in the system
import findspark
findspark.init()

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate
spark

<bound method SparkSession.Builder.getOrCreate of <pyspark.sql.session.SparkSession.Builder object at 0x7f398ad54890>>

In [None]:
# Import dependencies 
from pyspark.sql import functions as F
from pyspark.sql import types as T

# StringIndexer is similar to labelencoder which gives a label to each category
# OneHotEncoder created onehot encoding vector
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# VectorAssembler is used to create vector from the features. MOdeling takes vector as an input
from pyspark.ml.feature import VectorAssembler

# DecisionTreeClassifier is used for classification problems
from pyspark.ml.classification import DecisionTreeClassifier
# For regression problems
from pyspark.ml.classification import LogisticRegression

In [None]:
#reading data
spark = SparkSession.builder.master("local").appName("Classify Urls").getOrCreate()
df =spark.read.csv('/content/drive/MyDrive/Weatherdata/weatherHistory.csv', 
                         header=True, 
                         multiLine=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True, 
                         encoding="UTF-8",
                         sep=',',
                         quote='"', 
                         escape='"'
                        )

In [None]:
df.show(10)

+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|      Formatted Date|      Summary|Precip Type|  Temperature (C)|Apparent Temperature (C)|Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+--------------------+-------------+-----------+-----------------+------------------------+--------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|2006-04-01 00:00:...|Partly Cloudy|       rain|9.472222222222221|      7.3888888888888875|    0.89|           14.1197|                 251.0|15.826300000000002|       0.0|             1015.13|Partly cloudy thr...|
|2006-04-01 01:00:...|Partly Cloudy|       rain|9.355555555555558|       7.227777777777776|    0.86|           14.2646|                 259.

In [None]:
#Checking the columns
df.columns

['Formatted Date',
 'Summary',
 'Precip Type',
 'Temperature (C)',
 'Apparent Temperature (C)',
 'Humidity',
 'Wind Speed (km/h)',
 'Wind Bearing (degrees)',
 'Visibility (km)',
 'Loud Cover',
 'Pressure (millibars)',
 'Daily Summary']

In [None]:
df.dtypes

[('Formatted Date', 'string'),
 ('Summary', 'string'),
 ('Precip Type', 'string'),
 ('Temperature (C)', 'string'),
 ('Apparent Temperature (C)', 'string'),
 ('Humidity', 'string'),
 ('Wind Speed (km/h)', 'string'),
 ('Wind Bearing (degrees)', 'string'),
 ('Visibility (km)', 'string'),
 ('Loud Cover', 'string'),
 ('Pressure (millibars)', 'string'),
 ('Daily Summary', 'string')]

In [None]:
# Printing the table schema
df.printSchema()

root
 |-- Formatted Date: string (nullable = true)
 |-- Summary: string (nullable = true)
 |-- Precip Type: string (nullable = true)
 |-- Temperature (C): string (nullable = true)
 |-- Apparent Temperature (C): string (nullable = true)
 |-- Humidity: string (nullable = true)
 |-- Wind Speed (km/h): string (nullable = true)
 |-- Wind Bearing (degrees): string (nullable = true)
 |-- Visibility (km): string (nullable = true)
 |-- Loud Cover: string (nullable = true)
 |-- Pressure (millibars): string (nullable = true)
 |-- Daily Summary: string (nullable = true)



In [None]:
#counting records
df.count()

96453

In [None]:
# We perform  summary statistics
df.describe().show()

+-------+--------------------+--------------------+-----------+--------------------+------------------------+-------------------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|summary|      Formatted Date|             Summary|Precip Type|     Temperature (C)|Apparent Temperature (C)|           Humidity| Wind Speed (km/h)|Wind Bearing (degrees)|   Visibility (km)|Loud Cover|Pressure (millibars)|       Daily Summary|
+-------+--------------------+--------------------+-----------+--------------------+------------------------+-------------------+------------------+----------------------+------------------+----------+--------------------+--------------------+
|  count|               96453|               96453|      96453|               96453|                   96453|              96453|             96453|                 96453|             96453|     96453|               96453|               96453|
|   mean|               

In [None]:
# drop rows with missing values
df = df.dropna() 

In [None]:

x=df.columns[-1]
y=df.columns[1] # Dependent Variable


In [None]:
y

'Summary'

In [None]:

from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=x,outputCol='Summary')