In [21]:
from pyspark.sql import *
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import spark_partition_id

In [22]:
conf = SparkConf() \
      .setMaster("local[3]") \
      .setAppName("InitialRDD")

spark = SparkSession \
      .builder \
      .config(conf=conf) \
      .getOrCreate()

In [23]:
flight_parquet_df = spark.read \
    .format("parquet") \
    .load("./../data/flight*.parquet") 

flight_parquet_df.show()

+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|   FL_DATE|OP_CARRIER|OP_CARRIER_FL_NUM|ORIGIN|ORIGIN_CITY_NAME|DEST|DEST_CITY_NAME|CRS_DEP_TIME|DEP_TIME|WHEELS_ON|TAXI_IN|CRS_ARR_TIME|ARR_TIME|CANCELLED|DISTANCE|
+----------+----------+-----------------+------+----------------+----+--------------+------------+--------+---------+-------+------------+--------+---------+--------+
|2000-01-01|        DL|             1451|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1115|    1113|     1343|      5|        1400|    1348|        0|     946|
|2000-01-01|        DL|             1479|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1315|    1311|     1536|      7|        1559|    1543|        0|     946|
|2000-01-01|        DL|             1857|   BOS|      Boston, MA| ATL|   Atlanta, GA|        1415|    1414|     1642|      9|        1721|    1651|        0|     946

In [24]:
flight_parquet_df_partitioned = flight_parquet_df.repartition(5)

print(str(flight_parquet_df_partitioned.rdd.getNumPartitions()))
flight_parquet_df_partitioned.groupBy(spark_partition_id()).count().show()



5
+--------------------+-----+
|SPARK_PARTITION_ID()|count|
+--------------------+-----+
|                   0|94096|
|                   1|94095|
|                   2|94095|
|                   3|94095|
|                   4|94096|
+--------------------+-----+



                                                                                

In [26]:
flight_parquet_df_partitioned.write \
    .format("json") \
    .mode("overwrite") \
    .option("path", "data/json/") \
    .partitionBy("OP_CARRIER", "ORIGIN") \
    .option("maxRecordsPerFile", 10000) \
    .save()

                                                                                

In [27]:
flight_parquet_df_partitioned.write \
    .format("parquet") \
    .mode("overwrite") \
    .option("path", "data/partquet/") \
    .save()

                                                                                

In [None]:
spark.stop()