In [0]:
%fs ls /public/retail_db

path,name,size,modificationTime
dbfs:/public/retail_db/README.md,README.md,826,1688522119000
dbfs:/public/retail_db/categories/,categories/,0,1688522106000
dbfs:/public/retail_db/create_db.sql,create_db.sql,10303495,1688522111000
dbfs:/public/retail_db/create_db_tables_pg.sql,create_db_tables_pg.sql,1830,1688522112000
dbfs:/public/retail_db/customers/,customers/,0,1688522112000
dbfs:/public/retail_db/departments/,departments/,0,1688522113000
dbfs:/public/retail_db/load_db_tables_pg.sql,load_db_tables_pg.sql,10297392,1688522116000
dbfs:/public/retail_db/order_items/,order_items/,0,1688522117000
dbfs:/public/retail_db/orders/,orders/,0,1688522116000
dbfs:/public/retail_db/products/,products/,0,1688522119000


In [0]:
schema = """ 
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

In [0]:
orders = spark.read.schema(schema).csv('/public/retail_db/orders')

In [0]:
orders.show()

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:00|            12111|       COMPLETE|
|       4|2013-07-25 00:00:00|             8827|         CLOSED|
|       5|2013-07-25 00:00:00|            11318|       COMPLETE|
|       6|2013-07-25 00:00:00|             7130|       COMPLETE|
|       7|2013-07-25 00:00:00|             4530|       COMPLETE|
|       8|2013-07-25 00:00:00|             2911|     PROCESSING|
|       9|2013-07-25 00:00:00|             5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:00|             5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:00|              918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:00|             1837|         CLOSED|
|      13|2013-07-25 00:0

In [0]:
orders.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- order_customer_id: integer (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
orders_json = spark.read.json('/public/retail_db_json/orders')

In [0]:
orders_json.show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

In [0]:
orders_json.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
import getpass
username = getpass.getuser()

In [0]:
input_dir = '/public/retail_db_json'
output_dir = f'/user/{username}/retail_db_parquet'

In [0]:
dbutils.fs.ls(input_dir)

[FileInfo(path='dbfs:/public/retail_db_json/categories/', name='categories/', size=0, modificationTime=1688523866000),
 FileInfo(path='dbfs:/public/retail_db_json/create_db_tables_pg.sql', name='create_db_tables_pg.sql', size=1830, modificationTime=1688523867000),
 FileInfo(path='dbfs:/public/retail_db_json/customers/', name='customers/', size=0, modificationTime=1688523868000),
 FileInfo(path='dbfs:/public/retail_db_json/departments/', name='departments/', size=0, modificationTime=1688523872000),
 FileInfo(path='dbfs:/public/retail_db_json/order_items/', name='order_items/', size=0, modificationTime=1688523875000),
 FileInfo(path='dbfs:/public/retail_db_json/orders/', name='orders/', size=0, modificationTime=1688523872000),
 FileInfo(path='dbfs:/public/retail_db_json/products/', name='products/', size=0, modificationTime=1688523882000)]

In [0]:
for file_details in dbutils.fs.ls(input_dir):
    print(file_details.path)

dbfs:/public/retail_db_json/categories/
dbfs:/public/retail_db_json/create_db_tables_pg.sql
dbfs:/public/retail_db_json/customers/
dbfs:/public/retail_db_json/departments/
dbfs:/public/retail_db_json/order_items/
dbfs:/public/retail_db_json/orders/
dbfs:/public/retail_db_json/products/


In [0]:
for file_details in dbutils.fs.ls(input_dir):
    if not ('.git' in file_details.path or file_details.path.endswith('sql')):
        print(f'Converting data in {file_details.path} folder from json to parquet')
        data_set_dir = file_details.path.split('/')[-2]
        df = spark.read.json(file_details.path)
        df.coalesce(1).write.parquet(f'{output_dir}/{data_set_dir}', mode = 'overwrite')

Converting data in dbfs:/public/retail_db_json/categories/ folder from json to parquet
Converting data in dbfs:/public/retail_db_json/customers/ folder from json to parquet
Converting data in dbfs:/public/retail_db_json/departments/ folder from json to parquet
Converting data in dbfs:/public/retail_db_json/order_items/ folder from json to parquet
Converting data in dbfs:/public/retail_db_json/orders/ folder from json to parquet
Converting data in dbfs:/public/retail_db_json/products/ folder from json to parquet


In [0]:
dbutils.fs.ls(f'/user/{username}/retail_db_parquet')

[FileInfo(path='dbfs:/user/root/retail_db_parquet/categories/', name='categories/', size=0, modificationTime=1697981152000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/customers/', name='customers/', size=0, modificationTime=1697981153000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/departments/', name='departments/', size=0, modificationTime=1697981155000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/order_items/', name='order_items/', size=0, modificationTime=1697981157000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/', name='orders/', size=0, modificationTime=1697981160000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/products/', name='products/', size=0, modificationTime=1697981161000)]

In [0]:
dbutils.fs.ls(f'/user/{username}/retail_db_parquet/orders')

[FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_SUCCESS', name='_SUCCESS', size=0, modificationTime=1697981160000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_committed_8179953115895123963', name='_committed_8179953115895123963', size=123, modificationTime=1697981160000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/_started_8179953115895123963', name='_started_8179953115895123963', size=0, modificationTime=1697981160000),
 FileInfo(path='dbfs:/user/root/retail_db_parquet/orders/part-00000-tid-8179953115895123963-c35ccadf-1cdc-40b7-a979-8e858c75fe85-17-1.c000.snappy.parquet', name='part-00000-tid-8179953115895123963-c35ccadf-1cdc-40b7-a979-8e858c75fe85-17-1.c000.snappy.parquet', size=489027, modificationTime=1697981160000)]

In [0]:
orders = spark.read.parquet(f'/user/{username}/retail_db_parquet/orders')

In [0]:
orders.show()

+-----------------+--------------------+--------+---------------+
|order_customer_id|          order_date|order_id|   order_status|
+-----------------+--------------------+--------+---------------+
|            11599|2013-07-25 00:00:...|       1|         CLOSED|
|              256|2013-07-25 00:00:...|       2|PENDING_PAYMENT|
|            12111|2013-07-25 00:00:...|       3|       COMPLETE|
|             8827|2013-07-25 00:00:...|       4|         CLOSED|
|            11318|2013-07-25 00:00:...|       5|       COMPLETE|
|             7130|2013-07-25 00:00:...|       6|       COMPLETE|
|             4530|2013-07-25 00:00:...|       7|       COMPLETE|
|             2911|2013-07-25 00:00:...|       8|     PROCESSING|
|             5657|2013-07-25 00:00:...|       9|PENDING_PAYMENT|
|             5648|2013-07-25 00:00:...|      10|PENDING_PAYMENT|
|              918|2013-07-25 00:00:...|      11| PAYMENT_REVIEW|
|             1837|2013-07-25 00:00:...|      12|         CLOSED|
|         

In [0]:
orders.printSchema()

root
 |-- order_customer_id: long (nullable = true)
 |-- order_date: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)

