In [None]:
!pip install pyspark==3.0.1 py4j==0.10.9 

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.0.1
  Downloading pyspark-3.0.1.tar.gz (204.2 MB)
[K     |████████████████████████████████| 204.2 MB 34 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 40.7 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612243 sha256=e8dcae3322d62f412104771cb2141055c959f208340e7b8d70859828801f47e3
  Stored in directory: /root/.cache/pip/wheels/5e/34/fa/b37b5cef503fc5148b478b2495043ba61b079120b7ff379f9b
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


**SparkSession**

SparkSession has become an entry point to PySpark since version 2.0 earlier the SparkContext is used as an entry point. The SparkSession is an entry point to underlying PySpark functionality to programmatically create PySpark RDD, DataFrame, and Dataset. It can be used in replace with SQLContext, HiveContext, and other contexts defined before 2.0. You should also know that SparkSession internally creates SparkConfig and SparkContext with the configuration provided with SparkSession. SparkSession can be created using SparkSession.builder builder patterns.




**Creating SparkSession**

To create a SparkSession, you need to use the builder pattern method builder()


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_For_Beginers')\
        .getOrCreate()

# where the '*' represents all the cores of the CPU.

**Reading Data and Structuring Data Using Spark Schema**

The pyspark can read data from various file formats such as Comma Separated Values (CSV), JavaScript Object Notation (JSON), Parquet, e.t.c. To read different file formats we use spark.read.

In [None]:
path = "/content/drive/My Drive/PySpark/"

In [None]:
# Reading the U.S Stock Price data from January 2019 to July 2020 which is available in Kaggle datasets.

data = spark.read.csv(path + 'stocks_price_final.csv',
                      sep =',',
                      header = True,
                      )
# Printing the schema of the data using PrintSchema method

data.printSchema()


root
 |-- _c0: string (nullable = true)
 |-- symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open: string (nullable = true)
 |-- high: string (nullable = true)
 |-- low: string (nullable = true)
 |-- close: string (nullable = true)
 |-- volume: string (nullable = true)
 |-- adjusted: string (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



**Spark schema** is the structure of the DataFrame or Dataset, we can define it using **StructType** class which is a collection of **StructField** that defines the column name(String), column type (DataType), nullable column (Boolean), and metadata (MetaData). spark infers the schema from data however some times the inferred datatype may not be correct or we may need to define our own column names and data types, especially while working with unstructured and semi-structured data.

In [None]:
from pyspark.sql.types import *

# Creating structure using StructType and StructField 

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields = data_schema)

# Reading the data using spark.read.csv() and see the schema of the structured data at final

data = spark.read.csv(
    path + 'stocks_price_final.csv',
    sep = ',',
    header = True,
    schema = final_struc 
    )

data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



**Different Methods to Inspect Data**

In [None]:
# schema(): returns the schema of the data(dataframe)

data.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- symbol: string (nullable = true)
 |-- data: date (nullable = true)
 |-- open: double (nullable = true)
 |-- high: double (nullable = true)
 |-- low: double (nullable = true)
 |-- close: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted: double (nullable = true)
 |-- market.cap: string (nullable = true)
 |-- sector: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- exchange: string (nullable = true)



In [None]:
# dtypes: returns a list of tuples with the columns names and it's data types

data.dtypes

[('_c0', 'int'),
 ('symbol', 'string'),
 ('data', 'date'),
 ('open', 'double'),
 ('high', 'double'),
 ('low', 'double'),
 ('close', 'double'),
 ('volume', 'int'),
 ('adjusted', 'double'),
 ('market.cap', 'string'),
 ('sector', 'string'),
 ('industry', 'string'),
 ('exchange', 'string')]

In [None]:
# head(n): returns n rows as a list

data.head(5)

[Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', data=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', data=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=4, symbol='TXG', data=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capit

In [None]:
# show(): displays the first 20 rows by default and it also takes a number as a parameter to display the number of rows of the data

data.show()

+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|_c0|symbol|      data|     open|     high|      low|    close| volume| adjusted|market.cap|       sector|            industry|exchange|
+---+------+----------+---------+---------+---------+---------+-------+---------+----------+-------------+--------------------+--------+
|  1|   TXG|2019-09-12|     54.0|     58.0|     51.0|    52.75|7326300|    52.75|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  2|   TXG|2019-09-13|    52.75|   54.355|49.150002|    52.27|1025200|    52.27|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  3|   TXG|2019-09-16|52.450001|     56.0|52.009998|55.200001| 269900|55.200001|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  4|   TXG|2019-09-17|56.209999|60.900002|   55.423|56.779999| 602800|56.779999|    $9.31B|Capital Goods|Biotechnology: La...|  NASDAQ|
|  5|   TXG|2019-09-18|56.849998|    62.2

In [None]:
# first(): returns the first row of the data

data.first()

Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ')

In [None]:
# take(n): returns the first n rows of the data

data.take(5)

[Row(_c0=1, symbol='TXG', data=datetime.date(2019, 9, 12), open=54.0, high=58.0, low=51.0, close=52.75, volume=7326300, adjusted=52.75, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=2, symbol='TXG', data=datetime.date(2019, 9, 13), open=52.75, high=54.355, low=49.150002, close=52.27, volume=1025200, adjusted=52.27, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=3, symbol='TXG', data=datetime.date(2019, 9, 16), open=52.450001, high=56.0, low=52.009998, close=55.200001, volume=269900, adjusted=55.200001, market.cap='$9.31B', sector='Capital Goods', industry='Biotechnology: Laboratory Analytical Instruments', exchange='NASDAQ'),
 Row(_c0=4, symbol='TXG', data=datetime.date(2019, 9, 17), open=56.209999, high=60.900002, low=55.423, close=56.779999, volume=602800, adjusted=56.779999, market.cap='$9.31B', sector='Capit

In [None]:
# describe(): computes the summary statistics of the columns with the numeric data type

data.describe().show()

+-------+-----------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+----------+----------------+--------------------+--------+
|summary|              _c0| symbol|              open|              high|               low|             close|            volume|          adjusted|market.cap|          sector|            industry|exchange|
+-------+-----------------+-------+------------------+------------------+------------------+------------------+------------------+------------------+----------+----------------+--------------------+--------+
|  count|          1729034|1729034|           1726301|           1726301|           1726301|           1726301|           1725207|           1726301|   1729034|         1729034|             1729034| 1729034|
|   mean|         864517.5|   null|15070.071703341051| 15555.06726813709|14557.808227578982|15032.714854330707|1397692.1627885813|  14926.1096887955|      null|        

In [None]:
# columns: returns a list that contains the column names of the data

data.columns

In [None]:
# count(): returns the count of the number of rows in the data

data.count()

In [None]:
# distinct(): returns the number of the distict rows in the data

data.distinct()

**Columns Manipulation**

There are different methods that are used to add, update, delete, columns of the data.

In [None]:
# Adding Column: Use withColumn the method takes two parameters column name and data to add a new column to the existing data

data = data.withColumn('date', data.data)
data.show()

In [None]:
# Update column: Use withColumnRenamed which takes two parameters existing column name and new column name to rename the existing column.

data = data.withColumnRenamed('date', 'data_changed')
data.show()

In [None]:
# Delete Column: Use drop the method which takes the column name and returns the data.

data = data.drop('data_changed')
data.show()

**Dealing with Missing Values**

We often encounter missing values while dealing with real-time data. These missing values are encoded as NaN, Blanks, and placeholders. There are various techniques to deal with missing values some of the popular ones are:

**Remove**: Remove the rows having missing values in any one of the columns.

**Impute with Mean/Median**: Replace the missing values using the Mean/Median of the respective column. It’s easy, fast, and works well with small numeric datasets.

**Impute with Most Frequent Values**: As the name suggests use the most frequent value in the column to replace the missing value of that column. This works well with categorical features and may also introduce bias into the data.

**Impute using KNN**: K-Nearest Neighbors is a classification algorithm that uses feature similarity using different distance metrics such as Euclidean, Mahalanobis, Manhattan, Minkowski, and Hamming e.t.c. for any new data points. This is very efficient compared to the above-mentioned methods to impute missing values depending on the dataset and it is computationally expensive and sensitive to outliers.

In [None]:
# Using the Dataset: https://www.kaggle.com/datasets/dansbecker/melbourne-housing-snapshot?select=melb_data.csv

data1 = spark.read.csv('/var/melb_data.csv')
data1.show()

In [None]:
# Remove rows with mising values

data1.na.drop().show()

In [None]:
# Filling Missing Values with NA.fill()

data1.na.fill('CMC Global', '_c14').show()

**Imputing NA values with central tendency measured**

This is something of a more professional way to handle the missing values i.e imputing the null values with mean/median/mode depending on the domain of the dataset. Here we will be using the Imputer function from the PySpark library to use the mean/median/mode functionality.

In [None]:
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['open'],
    outputCols = ["{}_imputed".format(a) for a in ['open']]
).setStrategy("mean")

**Fit and Transform**

Now so we have used the Imputer object to impute the mean values in the place of null values but to see the changes we need to use the fit-transform method simultaneously.


In [None]:
imputer.fit(data).transform(data).show()

**Querying Data**

The PySpark and PySpark SQL provide a wide range of methods and functions to query the data at ease. Here are the few most used methods:

Select  Filter  Between  When  Like  GroupBy  Aggregations






In [None]:
# SELECT: It is used to select single or multiples columns using the names of the columns
# Selecting single column 

data.select('sector').show(5)

# Selecting Multiple columns

data.select(['open', 'close', 'adjusted']).show(5)

In [None]:
# FILTER: filter the data based on the given condition, you can also give multiple conditions using AND(&), OR(|), and NOT(~) operators.

from pyspark.sql.functions import col, lit

data.filter((data.open > 54) & (data.low == 51)).show(5)


**Data Visualization**

We are going to utilize matplotlib and pandas to visualize data, the toPandas() method used to convert the data into pandas dataframe. Using the dataframe we utilize the plot() method to visualize data. The below code shows how to display a bar graph for the average opening, closing, and adjusted stock price concerning the sector.





In [None]:
from matplotlib import pyplot as plt

In [None]:
sec_df =  data.select(['sector', 
                       'open', 
                       'close', 
                       'adjusted']
                     )\
                     .groupBy('sector')\
                     .mean()\
                     .toPandas()

ind = list(range(12))

ind.pop(6)

sec_df.iloc[ind ,:].plot(kind = 'bar', x='sector', y = sec_df.columns.tolist()[1:], 
                         figsize=(12, 6), ylabel = 'Stock Price', xlabel = 'Sector')
plt.show()