In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, BooleanType, ArrayType

In [2]:
spark = SparkSession.builder.master("local[*]").appName("Test7").getOrCreate()

- Parquet File maintain the schema along with the data. Hence it is used to process a structured file.
- Apache Parquet file is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model, or programming language.
- While querying columnar storage, it skips the nonrelevant data very quickly, making faster query execution. As a result aggregation queries consume less time compared to row-oriented databases.
- Pyspark SQL provides support for both reading and writing Parquet files that automatically capture the schema of the original data, It also reduces data storage by 75% on average. Pyspark by default supports Parquet in its library hence we don’t need to add any dependency libraries.

In [3]:
df1 = spark.read.parquet("D:\\LAB\\Spark\\zipcodes.parquet")
df1.printSchema()

root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: integer (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)



In [4]:
df1.show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           null|               null|      null|         null|
|           2|    70

# create temporary view

In [5]:
df1.createOrReplaceTempView("ParquetTable")
parkSQL = spark.sql("select * from ParquetTable where State = 'PR' and Country = 'US'")
parkSQL.show()

+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|  Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-----+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96|-66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           null|               null|      null| null|
|           2|    704|   STANDARD|PASEO COSTA DEL SUR|  

# create temporary view or table directly on the parquet file

In [9]:
spark.sql("create temporary view tbl1 using parquet options (path \'D:/LAB/Spark/zipcodes.parquet')")
spark.sql("select * from tbl1").show()

+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|RecordNumber|Zipcode|ZipCodeType|               City|State|  LocationType|  Lat|   Long|Xaxis|Yaxis|Zaxis|WorldRegion|Country|        LocationText|            Location|Decommisioned|TaxReturnsFiled|EstimatedPopulation|TotalWages|        Notes|
+------------+-------+-----------+-------------------+-----+--------------+-----+-------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------------+---------------+-------------------+----------+-------------+
|           1|    704|   STANDARD|        PARC PARQUE|   PR|NOT ACCEPTABLE|17.96| -66.22| 0.38|-0.87|  0.3|         NA|     US|     Parc Parque, PR|NA-US-PR-PARC PARQUE|        false|           null|               null|      null|         null|
|           2|    70

In [10]:
df2 = spark.read.parquet("D:\\LAB\\Spark\\userdata1.parquet")
df2.show()

+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|  registration_dttm| id|first_name|last_name|               email|gender|     ip_address|                 cc|             country| birthdate|   salary|               title|            comments|
+-------------------+---+----------+---------+--------------------+------+---------------+-------------------+--------------------+----------+---------+--------------------+--------------------+
|2016-02-03 13:25:29|  1|    Amanda|   Jordan|    ajordan0@com.com|Female|    1.197.201.2|   6759521864920116|           Indonesia|  3/8/1971| 49756.53|    Internal Auditor|               1E+02|
|2016-02-03 22:34:03|  2|    Albert|  Freeman|     afreeman1@is.gd|  Male| 218.111.175.34|                   |              Canada| 1/16/1968|150280.17|       Accountant IV|                    |
|2016-02-03 06:39:31|  3|

# Create Parquet Partition file

In [11]:
df2.write.partitionBy("gender","salary").mode("overwrite").parquet("D:\\LAB\\Spark\\jupyter_notebooks\\Output\\userdataop.parquet")

In [12]:
df2.write.partitionBy("gender","country").mode("overwrite").parquet("D:\\LAB\\Spark\\jupyter_notebooks\\Output\\userdataop1.parquet")

# Retrieving from a partitioned Parquet file

In [13]:
#reading partitioned parquet file into DataFrame with gender=Male
df3 = spark.read.parquet("D:\\LAB\\Spark\\jupyter_notebooks\\Output\\userdataop.parquet\\gender=Male")
df3.show(truncate = False)

+-------------------+---+----------+----------+---------------------------+---------------+-------------------+--------------------------------+----------+------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+
|registration_dttm  |id |first_name|last_name |email                      |ip_address     |cc                 |country                         |birthdate |title                         |comments                                                                                                                                                                                                             |salary   |
+-------------------+---+----------+----------+---------------------------+---------------+-------------------+--------------------------------+----------+-----------------------

# Creating a temporary view on Partitioned Parquet file

In [23]:
spark.sql("create temporary view User using parquet options (path \'D:/LAB/Spark/jupyter_notebooks/Output/userdataop.parquet/gender=Male')")
spark.sql("select * from User").show()

+-------------------+---+----------+----------+--------------------+---------------+-------------------+--------------------+----------+--------------------+----------------------+---------+
|  registration_dttm| id|first_name| last_name|               email|     ip_address|                 cc|             country| birthdate|               title|              comments|   salary|
+-------------------+---+----------+----------+--------------------+---------------+-------------------+--------------------+----------+--------------------+----------------------+---------+
|2016-02-04 00:33:02|797|    Joshua|  Morrison|jmorrisonm4@googl...| 135.197.145.65|                   |               China| 5/30/1986|Senior Sales Asso...|  Ṱ̺̺̕o͞ ̷i̲̬͇̪͙n̝...|165569.54|
|2016-02-04 03:44:37|477|      Fred|  Hamilton|fhamiltond8@intel...|   159.102.1.66| 675903849197748756|      Czech Republic|11/13/1972|Human Resources A...|  ̡͓̞ͅI̗̘̦͝n͇͇͙v̮̫o...|136458.61|
|2016-02-03 12:33:40|672|     Frank|   Perkin