In [1]:
#!pip install findspark
import findspark

findspark.init()

In [2]:
# imports
from pyspark.sql import SparkSession

In [3]:
# Create a spark session
spark = SparkSession.builder.getOrCreate()

In [7]:
# Create a dataframe
data = [("James ","MM","Smith","36636","M",3300),
              ("Maria ","Rose","Hills","40288","F",5000),
              ("Robert ","Fox","Williams","42114","M",4400),
              ("Loris","Anne","Jones","39192","M",6000),
              ("Jen","Mary","Brown","","F",3333)]
columns = ["firstname","middlename","lastname","dob","gender","salary"]

df = spark.createDataFrame(data,columns)
df.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|   James |        MM|   Smith|36636|     M|  3300|
|   Maria |      Rose|   Hills|40288|     F|  5000|
|  Robert |       Fox|Williams|42114|     M|  4400|
|    Loris|      Anne|   Jones|39192|     M|  6000|
|      Jen|      Mary|   Brown|     |     F|  3333|
+---------+----------+--------+-----+------+------+



In [8]:
# Create a parquet file
df.write.parquet("data/people.paquet")

In [9]:
df.select('firstname').rdd.flatMap(lambda x:x).collect()

['James ', 'Maria ', 'Robert ', 'Loris', 'Jen']

In [13]:
# Read a parquet file
read_parq = spark.read.parquet("data/people.paquet")
read_parq.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |       Fox|Williams|42114|     M|  4400|
|   Maria |      Rose|   Hills|40288|     F|  5000|
|    Loris|      Anne|   Jones|39192|     M|  6000|
|   James |        MM|   Smith|36636|     M|  3300|
|      Jen|      Mary|   Brown|     |     F|  3333|
+---------+----------+--------+-----+------+------+



In [14]:
# Create a tempview and show the result
read_parq.createOrReplaceTempView("ParquetTable")

view_parq = spark.sql("select * from ParquetTable").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|gender|salary|
+---------+----------+--------+-----+------+------+
|  Robert |       Fox|Williams|42114|     M|  4400|
|   Maria |      Rose|   Hills|40288|     F|  5000|
|    Loris|      Anne|   Jones|39192|     M|  6000|
|   James |        MM|   Smith|36636|     M|  3300|
|      Jen|      Mary|   Brown|     |     F|  3333|
+---------+----------+--------+-----+------+------+



In [16]:
# Partioned the parquet file by gender
df.write.partitionBy("gender").mode("overwrite").parquet("data/people_by_gender.paquet")

In [23]:
# Read a parquet file
read_partt = spark.read.parquet("data/people_by_gender.paquet")
read_partt.show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|salary|gender|
+---------+----------+--------+-----+------+------+
|  Robert |       Fox|Williams|42114|  4400|     M|
|   Maria |      Rose|   Hills|40288|  5000|     F|
|    Loris|      Anne|   Jones|39192|  6000|     M|
|   James |        MM|   Smith|36636|  3300|     M|
|      Jen|      Mary|   Brown|     |  3333|     F|
+---------+----------+--------+-----+------+------+



In [24]:
# Create a tempview and show the result
read_partt.createOrReplaceTempView("ParquetTable_Gender")

view_partt = spark.sql("select * from ParquetTable_Gender").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|  dob|salary|gender|
+---------+----------+--------+-----+------+------+
|  Robert |       Fox|Williams|42114|  4400|     M|
|   Maria |      Rose|   Hills|40288|  5000|     F|
|    Loris|      Anne|   Jones|39192|  6000|     M|
|   James |        MM|   Smith|36636|  3300|     M|
|      Jen|      Mary|   Brown|     |  3333|     F|
+---------+----------+--------+-----+------+------+



In [26]:
# Read a parquet file partioned by gender
read_by_gender_M = spark.read.parquet("data/people_by_gender.paquet/gender=M")
read_by_gender_M.show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|  Robert |       Fox|Williams|42114|  4400|
|    Loris|      Anne|   Jones|39192|  6000|
|   James |        MM|   Smith|36636|  3300|
+---------+----------+--------+-----+------+



In [27]:
# Create a tempview and show it
read_by_gender_M.createOrReplaceTempView("ParquetTable_M")

view_by_gender_M = spark.sql("select * from ParquetTable_M").show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|  Robert |       Fox|Williams|42114|  4400|
|    Loris|      Anne|   Jones|39192|  6000|
|   James |        MM|   Smith|36636|  3300|
+---------+----------+--------+-----+------+



In [29]:
# Create a tempview using a parquet file and show the result
spark.sql("CREATE TEMPORARY VIEW Person_Gender_F USING parquet OPTIONS (path \"data/people_by_gender.paquet/gender=F\")")

spark.sql("select * from Person_Gender_F").show()

+---------+----------+--------+-----+------+
|firstname|middlename|lastname|  dob|salary|
+---------+----------+--------+-----+------+
|   Maria |      Rose|   Hills|40288|  5000|
|      Jen|      Mary|   Brown|     |  3333|
+---------+----------+--------+-----+------+



In [38]:
# Read a csv file and show it
df_csv = spark.read.options(header = "True", inferSchema = "True").csv("data/customers.csv")
df_csv.show()

+-------+------+
|   Name|  Date|
+-------+------+
|  James|202108|
|Michael|202106|
| Robert|202108|
|  Maria|202106|
|    Jen|202108|
+-------+------+



In [None]:
from pyspark.sql.functions import *

df2 = df2.withColumn("date", func.to_date(func.col("DateTime")))
df2.show()

In [39]:
# Create a parquet file
df_csv.write.parquet("data/dt.paquet")

In [40]:
# Create a parquet file
rd_p = spark.read.parquet("data/dt.paquet")
rd_p.show()

+-------+------+
|   Name|  Date|
+-------+------+
|  James|202108|
|Michael|202106|
| Robert|202108|
|  Maria|202106|
|    Jen|202108|
+-------+------+



In [41]:
# Create a tempview and show the result
rd_p.createOrReplaceTempView("ParquetTable_DT")

view_dt = spark.sql("select * from ParquetTable_DT").show()

+-------+------+
|   Name|  Date|
+-------+------+
|  James|202108|
|Michael|202106|
| Robert|202108|
|  Maria|202106|
|    Jen|202108|
+-------+------+



In [43]:
# Partioned the parquet file by date
df_csv.write.partitionBy("date").mode("overwrite").parquet("data/dt.paquet")

In [45]:
# Read a parquet file partioned by date
rd_pd = spark.read.parquet("data/dt.paquet/date=202106")
rd_pd.show()

+-------+
|   Name|
+-------+
|Michael|
|  Maria|
+-------+



In [51]:
# Create a tempview using a parquet file and show the result
spark.sql("CREATE TEMPORARY VIEW ParquetTable_DTD USING parquet OPTIONS (path \"data/dt.paquet/date=202106\")")

spark.sql("select * from ParquetTable_DTD").show()

+-------+
|   Name|
+-------+
|Michael|
|  Maria|
+-------+

