In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 63.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=ede292a40d82ecd5db32a49e52cdaba90c9f031449412e4d9fad69e053b558dc
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Spark Read Write").getOrCreate()

**Spark data source API**

DataFrameReader
  .format()
  .option()
  .schema()
  .load()

  eg.

  Spark.read.format('CSV').option('header','true').option("path","/drive/abc").schema(mySchema).load()

**Reading CSV, Parquet and JASON**

In [5]:
from google.colab import files

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

Saving flight-time.parquet to flight-time.parquet
User uploaded file "flight-time.parquet" with length 5001183 bytes


In [4]:
flightTimeCSVDf= spark.read.format('CSV').option('inferSchema','true').option('header','true').load('flight-time.csv')

In [None]:
flightTimeCSVDf.show()

+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
| FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+--------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|1/1/2000|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|1/1/2000|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|1/1/2000|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946|
|1/1/2000|

In [None]:
flightTimeJasonDf = spark.read.format('json').load('flight-time.json')

In [None]:
flightTimeJasonDf.printSchema()

root
 |-- ARR_TIME: long (nullable = true)
 |-- CANCELLED: long (nullable = true)
 |-- CRS_ARR_TIME: long (nullable = true)
 |-- CRS_DEP_TIME: long (nullable = true)
 |-- DEP_TIME: long (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- DISTANCE: long (nullable = true)
 |-- FL_DATE: string (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: long (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- TAXI_IN: long (nullable = true)
 |-- WHEELS_ON: long (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [6]:
flightTimeParquetDf = spark.read.format('parquet').load('flight-time.parquet')

In [None]:
flightTimeParquetDf.printSchema()

root
 |-- FL_DATE: date (nullable = true)
 |-- OP_CARRIER: string (nullable = true)
 |-- OP_CARRIER_FL_NUM: integer (nullable = true)
 |-- ORIGIN: string (nullable = true)
 |-- ORIGIN_CITY_NAME: string (nullable = true)
 |-- DEST: string (nullable = true)
 |-- DEST_CITY_NAME: string (nullable = true)
 |-- CRS_DEP_TIME: integer (nullable = true)
 |-- DEP_TIME: integer (nullable = true)
 |-- WHEELS_ON: integer (nullable = true)
 |-- TAXI_IN: integer (nullable = true)
 |-- CRS_ARR_TIME: integer (nullable = true)
 |-- ARR_TIME: integer (nullable = true)
 |-- CANCELLED: integer (nullable = true)
 |-- DISTANCE: integer (nullable = true)



**Spark supported data types **

Spark Types----------------------------Python types
------------------------------------------------
1. IntegerType---------------------------Int
2. LongType------------------------------Long
3. FloatType-----------------------------Float
4. DoubleType----------------------------Float
5. StringType----------------------------String
6. DateType------------------------------datetime.date
7. TimestampType-------------------------datetime.datetime
8. ArrayType-----------------------------List, tuple & array
9. MapType-------------------------------dict


In [None]:
# Define Schema
#There are 2 types to define schema 1. Programatically 2. Using DDL String

# programatically
from pyspark.sql.types import StructType, StructField,DateType,IntegerType,FloatType,StringType
mySchema = StructType([StructField('FL_DATE',DateType()),StructField('OP_CARRIER',StringType()),StructField('OP_CARRIER_FL_NUM',IntegerType()),StructField('ORIGIN',StringType())])

In [None]:
flightTimeCSVDf= spark.read.format('CSV').option('header','true').schema(mySchema).option('mode','FAILFAST').load('flight-time.csv')

AnalysisException: ignored

In [None]:
# 2. Using DDL String

FlightSchemaDDL = """ FL_DATE DATE, OP_CARRIER STRING,OP_CARRIER_FL_NUM INT  """

**DataFrameWriter**

DataFrameWriter .format() .option() . partitionBy .bucketBy .schema() .save()

eg.

DataFrameWriter.format('parquet').option('path','/abc/a.parquet').mode('saveMode').save()

Parquet is the default format for saving files

**Spark File Layout**

1.   No of files and file size
2.   Partition & Bucket
3.   Sorted Data

Benifits of partitioning the data are 
1. Parallel Processing
2. Partition Elimination



In [None]:
# Avro output file creation
# Avro file format don't come by default with pyspark so we have to import scala package for it.
df.write.format('avro').mode('overwrite').option('path','aa/dd.avro').save()

df.rdd.getNumPartitions()

df.groupBy(spark_partition_id()).count().show()

df.write.format('jason').mode('overwrite').option('path','aa/dd.jason').partitionBy('OP_CARIER','ORIGIN').save()


**Spark database and Table**

A database has 2 object 

1. table
2. View (only metadata)

A table has 2 parts 

1. Table Data (Spark Warehouse) By default parquet file
2. Table Metadata (Catalog Metastore)

Types Spark Tables: 

1. Mnaged tables
2. Unmanaged Tables (External Tables)

Difference between the 2 is in case of managed table the data is stored in the desired directory of the database as decided by the admin, whereas for unmanaged table the data resides elsewhere externally and only it is linked by metadata (as that only is stored in metadata catalog) 

In [7]:
#Create mnanaged table
flightTimeParquetDf.write.mode("overwrite").saveAsTable("flight_data_tbl")
# This will create this table in spark default database.

In [9]:
#If want to create this is a different database then 2 option 
#1. Specify the database name while creation 
spark.sql("Create database if not exists airline_db")
flightTimeParquetDf.write.mode("overwrite").saveAsTable("airline_db.flight_data_tbl")

#2. Set the database as current database
#spark.catalog.setCurrentDatabase("airline_db")

In [10]:
#query for catalog
print(spark.catalog.listTables("airline_db"))

[Table(name='flight_data_tbl', database='airline_db', description=None, tableType='MANAGED', isTemporary=False)]


In [11]:
#partitionBy -- 
flightTimeParquetDf.write.mode("overwrite").partitionBy("ORIGIN","OP_CARRIER").saveAsTable("airline_db.flight_data_tbl")

In [12]:
#bucketBy -- if the no of catrgory is more
flightTimeParquetDf.write.mode("overwrite").bucketBy(5,"ORIGIN","OP_CARRIER").saveAsTable("airline_db.flight_data_tbl")