<a href="https://colab.research.google.com/github/Ngugisenior/data_analytics/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1. Import Libraries



In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 65 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 62.9 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=ed63666cc7203b25e3b24c65006db0508263492b439f2eba715bea8f616f4764
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2


In [78]:
import pyspark
import pandas as pd
import datetime as dt
from datetime import date
import requests
import json

In [90]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType
from urllib.request import urlopen

# 2. PySpark Session

In [6]:
spark = SparkSession.builder.appName('PySpark Tutorial').getOrCreate()

# 3. Import Json Data

In [53]:
url = 'https://ergast.com/api/f1/drivers.json?limit=60'
j_data = requests.get(url)
f1 = j_data.json()

# 4. Read JSON data to a PySpark dataframe

In [60]:
rdd = spark.sparkContext.parallelize([f1['MRData']['DriverTable']['Drivers']])
df_spark = spark.read.option("multiLine", True).json(rdd)

# 5. Show Dataframe Data



In [69]:
df_spark.show()

+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+
|code|dateOfBirth|    driverId| familyName| givenName|nationality|permanentNumber|                 url|
+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+
|null| 1932-07-10|       abate|      Abate|     Carlo|    Italian|           null|http://en.wikiped...|
|null| 1913-03-21|   abecassis|  Abecassis|    George|    British|           null|http://en.wikiped...|
|null| 1957-11-27|     acheson|    Acheson|     Kenny|    British|           null|http://en.wikiped...|
|null| 1969-11-19|       adams|      Adams|  Philippe|    Belgian|           null|http://en.wikiped...|
|null| 1913-12-15|        ader|       Ader|      Walt|   American|           null|http://en.wikiped...|
|null| 1921-11-05|      adolff|     Adolff|      Kurt|     German|           null|http://en.wikiped...|
|null| 1913-08-21|  agabashian| Agabashian|      Fred|   America

# 6. Show DataFrame Schema

In [62]:
df_spark.printSchema()

root
 |-- code: string (nullable = true)
 |-- dateOfBirth: string (nullable = true)
 |-- driverId: string (nullable = true)
 |-- familyName: string (nullable = true)
 |-- givenName: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- permanentNumber: string (nullable = true)
 |-- url: string (nullable = true)



# 7. DataFrame columns

In [63]:
df_spark.columns

['code',
 'dateOfBirth',
 'driverId',
 'familyName',
 'givenName',
 'nationality',
 'permanentNumber',
 'url']

# 8. Top N columns

In [70]:
df_spark.head(n=5)

[Row(code=None, dateOfBirth='1932-07-10', driverId='abate', familyName='Abate', givenName='Carlo', nationality='Italian', permanentNumber=None, url='http://en.wikipedia.org/wiki/Carlo_Mario_Abate'),
 Row(code=None, dateOfBirth='1913-03-21', driverId='abecassis', familyName='Abecassis', givenName='George', nationality='British', permanentNumber=None, url='http://en.wikipedia.org/wiki/George_Abecassis'),
 Row(code=None, dateOfBirth='1957-11-27', driverId='acheson', familyName='Acheson', givenName='Kenny', nationality='British', permanentNumber=None, url='http://en.wikipedia.org/wiki/Kenny_Acheson'),
 Row(code=None, dateOfBirth='1969-11-19', driverId='adams', familyName='Adams', givenName='Philippe', nationality='Belgian', permanentNumber=None, url='http://en.wikipedia.org/wiki/Philippe_Adams'),
 Row(code=None, dateOfBirth='1913-12-15', driverId='ader', familyName='Ader', givenName='Walt', nationality='American', permanentNumber=None, url='http://en.wikipedia.org/wiki/Walt_Ader')]

# 9. Select a single DataFrame column

In [72]:
df_spark.select(['givenName']).show()

+----------+
| givenName|
+----------+
|     Carlo|
|    George|
|     Kenny|
|  Philippe|
|      Walt|
|      Kurt|
|      Fred|
|      Kurt|
|      Jack|
|Christijan|
| Alexander|
|   Michele|
|      Jean|
|     Jaime|
|  Philippe|
|     Cliff|
|  Fernando|
|  Giovanna|
|       Red|
|    George|
+----------+
only showing top 20 rows



# 10. Select multiple columns from a PySpark DataFrame

In [73]:
df_spark.select(['givenName', 'url']).show()

+----------+--------------------+
| givenName|                 url|
+----------+--------------------+
|     Carlo|http://en.wikiped...|
|    George|http://en.wikiped...|
|     Kenny|http://en.wikiped...|
|  Philippe|http://en.wikiped...|
|      Walt|http://en.wikiped...|
|      Kurt|http://en.wikiped...|
|      Fred|http://en.wikiped...|
|      Kurt|http://en.wikiped...|
|      Jack|http://en.wikiped...|
|Christijan|http://en.wikiped...|
| Alexander|http://en.wikiped...|
|   Michele|http://en.wikiped...|
|      Jean|http://en.wikiped...|
|     Jaime|http://en.wikiped...|
|  Philippe|http://en.wikiped...|
|     Cliff|http://en.wikiped...|
|  Fernando|http://en.wikiped...|
|  Giovanna|http://en.wikiped...|
|       Red|http://en.wikiped...|
|    George|http://en.wikiped...|
+----------+--------------------+
only showing top 20 rows



# 11. Checking the column datatypes

In [74]:
df_spark.dtypes

[('code', 'string'),
 ('dateOfBirth', 'string'),
 ('driverId', 'string'),
 ('familyName', 'string'),
 ('givenName', 'string'),
 ('nationality', 'string'),
 ('permanentNumber', 'string'),
 ('url', 'string')]

# 12. Describe function similar to pandas

In [77]:
df_spark.describe().show()

+-------+----+-----------+--------------+----------+---------+-----------+-----------------+--------------------+
|summary|code|dateOfBirth|      driverId|familyName|givenName|nationality|  permanentNumber|                 url|
+-------+----+-----------+--------------+----------+---------+-----------+-----------------+--------------------+
|  count|   7|         60|            60|        60|       60|         60|                3|                  60|
|   mean|null|       null|          null|      null|     null|       null|             42.0|                null|
| stddev|null|       null|          null|      null|     null|       null|40.95119045888654|                null|
|    min| AIT| 1900-03-29|         abate|     Abate|  Alberto|   American|               14|http://en.wikiped...|
|    max| BAR| 1996-03-23|mario_andretti|     Behra|     Élie|       Thai|               89|http://en.wikiped...|
+-------+----+-----------+--------------+----------+---------+-----------+--------------

# 13. Adding columns to a PySpark DataFrame

In [132]:
# Lets add a column named AGE which is the expected age of the driver today
df_spark = df_spark.withColumn('dateOfBirth',  col('dateOfBirth').cast(DateType()))

xx = dt.datetime.now()
df_spark = df_spark.withColumn('age', (date(xx.year,xx.month,xx.day)-df_spark['dateOfBirth']))
df_spark.show()

+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+--------------------+
|code|dateOfBirth|    driverId| familyName| givenName|nationality|permanentNumber|                 url|                 age|
+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+--------------------+
|null| 1932-07-10|       abate|      Abate|     Carlo|    Italian|           null|http://en.wikiped...|   89 years 2 months|
|null| 1913-03-21|   abecassis|  Abecassis|    George|    British|           null|http://en.wikiped...|108 years 5 month...|
|null| 1957-11-27|     acheson|    Acheson|     Kenny|    British|           null|http://en.wikiped...|63 years 9 months...|
|null| 1969-11-19|       adams|      Adams|  Philippe|    Belgian|           null|http://en.wikiped...|51 years 9 months...|
|null| 1913-12-15|        ader|       Ader|      Walt|   American|           null|http://en.wikiped...|107 years 8 month...|


# 14. Dropping acolumn from a Pyspark dataFrame

In [134]:
df_spark = df_spark.drop('age')
df_spark.show()

+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+
|code|dateOfBirth|    driverId| familyName| givenName|nationality|permanentNumber|                 url|
+----+-----------+------------+-----------+----------+-----------+---------------+--------------------+
|null| 1932-07-10|       abate|      Abate|     Carlo|    Italian|           null|http://en.wikiped...|
|null| 1913-03-21|   abecassis|  Abecassis|    George|    British|           null|http://en.wikiped...|
|null| 1957-11-27|     acheson|    Acheson|     Kenny|    British|           null|http://en.wikiped...|
|null| 1969-11-19|       adams|      Adams|  Philippe|    Belgian|           null|http://en.wikiped...|
|null| 1913-12-15|        ader|       Ader|      Walt|   American|           null|http://en.wikiped...|
|null| 1921-11-05|      adolff|     Adolff|      Kurt|     German|           null|http://en.wikiped...|
|null| 1913-08-21|  agabashian| Agabashian|      Fred|   America