In [34]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null


In [35]:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz


In [36]:
!tar xf spark-3.0.0-bin-hadoop3.2.tgz


In [37]:
!pip install -q findspark


In [38]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [39]:
import findspark
findspark.init()

In [40]:
findspark.find()

'/content/spark-3.0.0-bin-hadoop3.2/python/pyspark'

In [41]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# PySpark Data Source

1. Read CSV file into a Data Frame

In [42]:
df = spark.read.csv("/content/sample_data/california_housing_test.csv")
df.printSchema()
df.show(truncate=True)

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)
 |-- _c7: string (nullable = true)
 |-- _c8: string (nullable = true)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|        _c0|      _c1|               _c2|        _c3|           _c4|        _c5|       _c6|          _c7|               _c8|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.00

**Using Header records for column names**

In [43]:
df1 = spark.read.option("header", True) \
.csv("/content/sample_data/california_housing_test.csv").show()


+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000| 237.000000|     2.937500|    

**Read Multiple CSV files**

`df = spark.read.csv("path1, path2, path3")`


`df = spark.read.csv("Folder path")`





In [44]:
df1 = spark.read.option("header", True,) \
.csv("/content/sample_data/california_housing_test.csv").show()


+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000| 606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000| 277.000000|     3.599000|     176500.000000|
|-117.810000|33.780000|         27.000000|3589.000000|    507.000000|1484.000000| 495.000000|     5.793400|     270500.000000|
|-118.360000|33.820000|         28.000000|  67.000000|     15.000000|  49.000000|  11.000000|     6.135900|     330000.000000|
|-119.670000|36.330000|         19.000000|1241.000000|    244.000000| 850.000000| 237.000000|     2.937500|    

# Read & Write Parquet File

It is a columnar storage format that skips the non relevant data very quickly, making faster query execution

**Read Parquet File into a Data Frame**

In [58]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
data1 = [("Smith", "Jone", "123", "M", 20000),
         ("Nance", "watch", "456", "F", 10000),
         ("Rani", "Singh", "234", "F", 30000),
         ("Raju", "Thomas", "897", "M", 15000),
         ("Ranjan", "Sharma", "862", "M", 25000)
]

schema = StructType([ \
    StructField("FirstName", StringType(), True), \
    StructField("LastName", StringType(), True), \
    StructField("Id", StringType(), True), \
    StructField("Gender", StringType(), True), \
    StructField("Salary", IntegerType(), True) \
])

df = spark.createDataFrame(data=data1, schema =schema)
df.printSchema()
df.show()


root
 |-- FirstName: string (nullable = true)
 |-- LastName: string (nullable = true)
 |-- Id: string (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Salary: integer (nullable = true)

+---------+--------+---+------+------+
|FirstName|LastName| Id|Gender|Salary|
+---------+--------+---+------+------+
|    Smith|    Jone|123|     M| 20000|
|    Nance|   watch|456|     F| 10000|
|     Rani|   Singh|234|     F| 30000|
|     Raju|  Thomas|897|     M| 15000|
|   Ranjan|  Sharma|862|     M| 25000|
+---------+--------+---+------+------+



Writing a Data Frame into a Parquet File

In [65]:
df.write.parquet("/content/peoples.parquet")

Reading the data frame into Parquet File

In [66]:
parDF = spark.read.parquet("/content/peoples.parquet")

In [67]:
parDF.show()

+---------+--------+---+------+------+
|FirstName|LastName| Id|Gender|Salary|
+---------+--------+---+------+------+
|    Smith|    Jone|123|     M| 20000|
|    Nance|   watch|456|     F| 10000|
|     Rani|   Singh|234|     F| 30000|
|     Raju|  Thomas|897|     M| 15000|
|   Ranjan|  Sharma|862|     M| 25000|
+---------+--------+---+------+------+



**Executing SQL queries**

Pyspark Sql provides to create temporary views on parquet files for executing sql queries. These views are available until your program exists.

In [69]:
parDF.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where Salary >= 20000").show()

+---------+--------+---+------+------+
|FirstName|LastName| Id|Gender|Salary|
+---------+--------+---+------+------+
|    Smith|    Jone|123|     M| 20000|
|     Rani|   Singh|234|     F| 30000|
|   Ranjan|  Sharma|862|     M| 25000|
+---------+--------+---+------+------+



Creating a table on Parquet File

In [70]:
spark.sql("CREATE TEMPORARY VIEW PERSON USING parquet OPTIONS (path \"/content/peoples.parquet\")")
spark.sql("SELECT * FROM PERSON").show()

+---------+--------+---+------+------+
|FirstName|LastName| Id|Gender|Salary|
+---------+--------+---+------+------+
|    Smith|    Jone|123|     M| 20000|
|    Nance|   watch|456|     F| 10000|
|     Rani|   Singh|234|     F| 30000|
|     Raju|  Thomas|897|     M| 15000|
|   Ranjan|  Sharma|862|     M| 25000|
+---------+--------+---+------+------+



Create Parquet Partition file

In [71]:
df.write.partitionBy("Gender", "Salary").mode("overwrite").parquet("/content/peoples2.parquet")

Rerieving from a partitioned parquet

In [73]:
parDF2 = spark.read.parquet("/content/peoples2.parquet/Gender=M")
parDF2.show(truncate=False)

+---------+--------+---+------+
|FirstName|LastName|Id |Salary|
+---------+--------+---+------+
|Ranjan   |Sharma  |862|25000 |
|Raju     |Thomas  |897|15000 |
|Smith    |Jone    |123|20000 |
+---------+--------+---+------+



# Reading JSON File into DataFrame

In [75]:
df1 = spark.read.json("/zipcode.json")
df1.printSchema()
df1.show(truncate=False)

root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: long (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)
 |-- _corrupt_record: string (nullable = true)

+-------------------+-------+-------------+-------------------+-----+----------------------------+-----------------------+--------------+------

Read JSON file from Multi line

In [76]:
multiline_df = spark.read.option("multiline", "true") \
.json("/multiline-zipcode.json")
multiline_df.show()

+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



Reading Multiple files at a time

In [77]:
df2 = spark.read.json(['/zipcode.json', '/multiline-zipcode.json'])
df2.show()

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|_corrupt_record|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87

PySpark SQL also provides a way to read a JSON file by creating a temporary view directly from the reading file using spark.sqlContext.sql(“load JSON to temporary view”)

In [83]:

spark.sql("CREATE OR REPLACE TEMPORARY VIEW zipcode USING json OPTIONS" + 
      " (path '/zipcode.json')")
spark.sql("select * from zipcode").show()


+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|               City|Country|Decommisioned|EstimatedPopulation|  Lat|            Location|        LocationText|  LocationType|   Long|        Notes|RecordNumber|State|TaxReturnsFiled|TotalWages|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|_corrupt_record|
+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+----------+-----------+-----+-----+-----+-----------+-------+---------------+
|        PARC PARQUE|     US|        false|               null|17.96|NA-US-PR-PARC PARQUE|     Parc Parque, PR|NOT ACCEPTABLE| -66.22|         null|           1|   PR|           null|      null|         NA| 0.38|-0.87