In [None]:
!pip install pyspark --quiet
!pip install -U -q PyDrive --quiet
!apt install openjdk-8-jdk-headless &> /dev/null

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("civComplaints") \
    .config("spark.ui.port", "4050") \
    .getOrCreate()

In [None]:
spark

In [None]:
!wget --continue https://raw.githubusercontent.com/GarvitArya/pyspark-demo/main/sample_books.json -O /tmp/sample_books.json

--2023-10-13 16:16:51--  https://raw.githubusercontent.com/GarvitArya/pyspark-demo/main/sample_books.json
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.109.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.109.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1565 (1.5K) [text/plain]
Saving to: ‘/tmp/sample_books.json’


2023-10-13 16:16:51 (16.6 MB/s) - ‘/tmp/sample_books.json’ saved [1565/1565]



In [None]:
df = spark.read.json("/tmp/sample_books.json")

In [None]:
df.printSchema()

root
 |-- author: string (nullable = true)
 |-- edition: string (nullable = true)
 |-- price: double (nullable = true)
 |-- title: string (nullable = true)
 |-- year_written: long (nullable = true)



In [None]:
df.show(4,False)

+---------------+--------------+-----+----------------+------------+
|author         |edition       |price|title           |year_written|
+---------------+--------------+-----+----------------+------------+
|Austen, Jane   |Penguin       |18.2 |Northanger Abbey|1814        |
|Tolstoy, Leo   |Penguin       |12.7 |War and Peace   |1865        |
|Tolstoy, Leo   |Penguin       |13.5 |Anna Karenina   |1875        |
|Woolf, Virginia|Harcourt Brace|25.0 |Mrs. Dalloway   |1925        |
+---------------+--------------+-----+----------------+------------+
only showing top 4 rows



In [None]:
df.count()

13

In [None]:
df.select('title', 'price', 'year_written').show(5)

+----------------+-----+------------+
|           title|price|year_written|
+----------------+-----+------------+
|Northanger Abbey| 18.2|        1814|
|   War and Peace| 12.7|        1865|
|   Anna Karenina| 13.5|        1875|
|   Mrs. Dalloway| 25.0|        1925|
|       The Hours|12.35|        1999|
+----------------+-----+------------+
only showing top 5 rows



Get books that are written after 1950 & cost greater than $10

In [None]:
df_filtered = df.filter("year_written > 1950 AND price > 10 AND title IS NOT NULL")

df_filtered.select("title", "price", "year_written").show(50, False)

+-----------------------------+-----+------------+
|title                        |price|year_written|
+-----------------------------+-----+------------+
|The Hours                    |12.35|1999        |
|Harry Potter                 |19.95|2000        |
|One Hundred Years of Solitude|14.0 |1967        |
+-----------------------------+-----+------------+



# Get books that have Harry Porter in their title

In [None]:
df_filtered.select("title", "year_written").filter("title LIKE '%Harry Potter%'").distinct().show(20, False)

+------------+------------+
|title       |year_written|
+------------+------------+
|Harry Potter|2000        |
+------------+------------+



Using Pyspark SQL functions:

In [None]:
from pyspark.sql.functions import max
# Find the costliest book
maxValue = df_filtered.agg(max("price")).collect()[0][0]
print("maxValue: ",maxValue)
df_filtered.select("title","price").filter(df.price == maxValue).show(20, False)

maxValue:  19.95
+------------+-----+
|title       |price|
+------------+-----+
|Harry Potter|19.95|
+------------+-----+



In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip &> /dev/null
!unzip ngrok-stable-linux-amd64.zip &> /dev/null
get_ipython().system_raw('./ngrok http 4050 &')

**Creating PySpark DataFrames**

There are a few ways to manually create PySpark DataFrames:

createDataFrame

create_df

toDF



**createDataFrame**
Here’s how to create a DataFrame with createDataFrame:

In [None]:
df = spark.createDataFrame([("joe", 34), ("luisa", 22)], ["first_name", "age"])

df.show()

+----------+---+
|first_name|age|
+----------+---+
|       joe| 34|
|     luisa| 22|
+----------+---+



In [None]:
df.printSchema()

root
 |-- first_name: string (nullable = true)
 |-- age: long (nullable = true)



In [None]:
data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]

columns = ["firstname","middlename","lastname","dob","gender","salary"]
df2 = spark.createDataFrame(data=data, schema = columns)

In [None]:
df2.show(20)

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+



In [None]:
 dataDF = [(('James','','Smith'),'1991-04-01','M',3000),
  (('Michael','Rose',''),'2000-05-19','M',4000),
  (('Robert','','Williams'),'1978-09-05','M',4000),
  (('Maria','Anne','Jones'),'1967-12-01','F',4000),
  (('Jen','Mary','Brown'),'1980-02-17','F',-1)
]



Our base schema with nested structure.

In [None]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('gender', IntegerType(), True)
         ])

In [None]:
 df3 = spark.createDataFrame(data = dataDF, schema = schema)
 df.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



PySpark withColumnRenamed

– **To rename DataFrame column name**
PySpark has a withColumnRenamed() function on DataFrame to change a column name. This is the most straight forward approach; this function takes two parameters; the first is your existing column name and the second is the new column name you wish for.

In [None]:
df3.withColumnRenamed("dob","DateOfBirth").printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



**PySpark withColumnRenamed - To rename multiple columns**



In [None]:
df3 = df.withColumnRenamed("dob","DateOfBirth") \
    .withColumnRenamed("salary","salary_amount")
df3.printSchema()

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- DateOfBirth: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)



In [None]:
schema2 = StructType([
    StructField("fname",StringType()),
    StructField("middlename",StringType()),
    StructField("lname",StringType())])



You can also pass createDataFrame a RDD and schema to construct DataFrames with more precision:

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import *

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])

schema = schema = StructType([
   StructField("name", StringType(), True),
   StructField("age", IntegerType(), False)])

df = spark.createDataFrame(rdd, schema)

df.show()

+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+



**create_df**

The create_df method defined in quinn allows for precise schema definition when creating DataFrames.

In [None]:
!pip install quinn

Collecting quinn
  Downloading quinn-0.10.0-py3-none-any.whl (9.4 kB)
Installing collected packages: quinn
Successfully installed quinn-0.10.0


In [None]:
from pyspark.sql.types import *
from quinn.extensions import *

df = spark.create_df(
    [("jose", "a"), ("li", "b"), ("sam", "c")],
    [("name", StringType(), True), ("blah", StringType(), True)]
)

df.show()

+----+----+
|name|blah|
+----+----+
|jose|   a|
|  li|   b|
| sam|   c|
+----+----+



**italicised text**

**toDF**  

You can also create a RDD and convert it to a DataFrame with toDF:

In [None]:
from pyspark.sql import Row

rdd = spark.sparkContext.parallelize([
    Row(name='Allie', age=2),
    Row(name='Sara', age=33),
    Row(name='Grace', age=31)])
df = rdd.toDF()
df.show()

+-----+---+
| name|age|
+-----+---+
|Allie|  2|
| Sara| 33|
|Grace| 31|
+-----+---+



Creating a DataFrame
To create our dataframe, we can start with a list of dictionaries in Python.

In [None]:
movies = [{'index': 1,
  'title': 'Shazam!',
  'release_date': 1553299200,
  'genre': 'Comedy'}, {'index': 2,
  'title': 'Captain Marvel',
  'release_date': 1551830400,
  'genre': 'Action'},  {'index': 3,
  'title': 'Escape Room',
  'release_date': 1546473600,
  'genre': 'Horror'}, {'index': 4,
  'title': 'How to Train A Dragon',
  'release_date': 1546473600,
  'genre': 'Animation'}]

So here we have a list of movies displaying the title, release_date and genre of each movie.

And then we can use the createDataFrame method on the spark session to create our dataframe.

In [None]:
movies_df = spark.createDataFrame(movies)
movies_df.show()

+---------+-----+------------+--------------------+
|    genre|index|release_date|               title|
+---------+-----+------------+--------------------+
|   Comedy|    1|  1553299200|             Shazam!|
|   Action|    2|  1551830400|      Captain Marvel|
|   Horror|    3|  1546473600|         Escape Room|
|Animation|    4|  1546473600|How to Train A Dr...|
+---------+-----+------------+--------------------+



So we can see from the above, that our dataframe organizes our data in a table. It has associated our records with various columns.

We can also see the schema on read characteristic from spark. That even without specifying a datatype, Spark was able to determine the datatype for each column.

In [None]:
movies_df.printSchema()

root
 |-- genre: string (nullable = true)
 |-- index: long (nullable = true)
 |-- release_date: long (nullable = true)
 |-- title: string (nullable = true)



From DataFrame to RDD

Now a dataframe in Pyspark creates an RDD under the hood.

In [None]:
movies_df.rdd

MapPartitionsRDD[74] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
movies_df.rdd.collect()

[Row(genre='Comedy', index=1, release_date=1553299200, title='Shazam!'),
 Row(genre='Action', index=2, release_date=1551830400, title='Captain Marvel'),
 Row(genre='Horror', index=3, release_date=1546473600, title='Escape Room'),
 Row(genre='Animation', index=4, release_date=1546473600, title='How to Train A Dragon')]

# **It's distributed**
And that even though this looks like a unified dataset, it's really distributed across different nodes.

In [None]:
movies_df.rdd.getNumPartitions()

2

**It's lazy**

Because our dataset is built on RDDs, is also operates in lazy manner. So for example, if we want to select all of the titles of an RDD, we'll use a map function to select the title from each row. But because map is a transformation, it will not operate on our data, until we follow up with an action.

In [None]:
movies_df.rdd.map(lambda movie: movie['title'])

PythonRDD[75] at RDD at PythonRDD.scala:53

In [None]:
movies_df.rdd.map(lambda movie: movie['title']).collect()

['Shazam!', 'Captain Marvel', 'Escape Room', 'How to Train A Dragon']

If we perform the equivalent operation with a dataframe, the operation is also treated as a transformation. Let's see this. Below, we'll select the title of each record.

In [None]:
movies_df.select('title')

DataFrame[title: string]

So again, spark will not search through each of the records until an action is called.

In [None]:
movies_df.select('title').show()

+--------------------+
|               title|
+--------------------+
|             Shazam!|
|      Captain Marvel|
|         Escape Room|
|How to Train A Dr...|
+--------------------+



In [None]:
movies_df.select(['title', 'genre']).show()

+--------------------+---------+
|               title|    genre|
+--------------------+---------+
|             Shazam!|   Comedy|
|      Captain Marvel|   Action|
|         Escape Room|   Horror|
|How to Train A Dr...|Animation|
+--------------------+---------+

