In [0]:
spark

<pyspark.sql.connect.session.SparkSession at 0x7f1596ea9910>

In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

sss_schema = StructType([
    StructField('row_id', StringType(), True),
    StructField('order_id', StringType(), True),
    StructField('order_date', StringType(), True),
    StructField('ship_date', StringType(), True),
    StructField('ship_mode', StringType(), True),
    StructField('customer_id', StringType(), True),
    StructField('customer_name', StringType(), True),
    StructField('segment', StringType(), True),
    StructField('country', StringType(), True),
    StructField('city', StringType(), True),
    StructField('state', StringType(), True),
    StructField('postal_code', LongType(), True),
    StructField('region', StringType(), True),
    StructField('product_id', StringType(), True),
    StructField('category', StringType(), True),
    StructField('sub_category', StringType(), True),
    StructField('product_name', StringType(), True),
    StructField('sales', DoubleType(), True),
    StructField('quantity', LongType(), True),
    StructField('discount', DoubleType(), True),
    StructField('profit', DoubleType(), True)
])

In [0]:
S3_INPUT_PATH = 's3://your-bucket/path/to/input-file'
S3_OUTPUT_PATH = 's3://your-bucket/path/to/output-dir/'

df = spark.read.format('csv').option('header',True).schema(sss_schema).load(S3_INPUT_PATH)

In [0]:
df.printSchema()

root
 |-- row_id: string (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_date: string (nullable = true)
 |-- ship_date: string (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: long (nullable = true)
 |-- discount: double (nullable = true)
 |-- profit: double (nullable = true)



In [0]:
df = df.drop('row_id')
# df.limit(10).display()

In [0]:
from pyspark.sql.functions import regexp_replace

df_insert = df.withColumn('order_date', regexp_replace(col('order_date'),r"^(\d{1}/)", r"0\1/"))\
    .withColumn('order_date', regexp_replace(col('order_date'),r"/(\d{1})/", r"/0\1/"))\
    .withColumn('ship_date', regexp_replace(col('ship_date'),r"^(\d{1}/)", r"0\1/"))\
    .withColumn('ship_date', regexp_replace(col('ship_date'),r"/(\d{1})/", r"/0\1/"))


In [0]:
df_insert.limit(10).display()

order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales,quantity,discount,profit
CA-2016-152156,11/01/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136
CA-2016-152156,11/01/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",731.94,3,0.0,219.582
CA-2016-138688,01/12/2016,01/16/2016,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal,14.62,2,0.0,6.8714
US-2015-108966,10/11/2015,10/18/2015,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,FUR-TA-10000577,Furniture,Tables,Bretford CR4500 Series Slim Rectangular Table,957.5775,5,0.45,-383.031
US-2015-108966,10/11/2015,10/18/2015,Standard Class,SO-20335,Sean O'Donnell,Consumer,United States,Fort Lauderdale,Florida,33311,South,OFF-ST-10000760,Office Supplies,Storage,Eldon Fold 'N Roll Cart System,22.368,2,0.2,2.5164
CA-2014-115812,01/01/2014,01/14/2014,Standard Class,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,FUR-FU-10001487,Furniture,Furnishings,"Eldon Expressions Wood and Plastic Desk Accessories, Cherry Wood",48.86,7,0.0,14.1694
CA-2014-115812,01/01/2014,01/14/2014,Standard Class,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,OFF-AR-10002833,Office Supplies,Art,Newell 322,7.28,4,0.0,1.9656
CA-2014-115812,01/01/2014,01/14/2014,Standard Class,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,TEC-PH-10002275,Technology,Phones,Mitel 5320 IP Phone VoIP phone,907.152,6,0.2,90.7152
CA-2014-115812,01/01/2014,01/14/2014,Standard Class,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,OFF-BI-10003910,Office Supplies,Binders,DXL Angle-View Binders with Locking Rings by Samsill,18.504,3,0.2,5.7825
CA-2014-115812,01/01/2014,01/14/2014,Standard Class,BH-11710,Brosina Hoffman,Consumer,United States,Los Angeles,California,90032,West,OFF-AP-10002892,Office Supplies,Appliances,Belkin F5C206VTEL 6 Outlet Surge,114.9,5,0.0,34.47


In [0]:
from pyspark.sql.functions import to_date, date_format, month

df_insert.select('*',month(to_date(col('order_date'), format='MM/dd/yyyy'))).limit(3).display()

order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales,quantity,discount,profit,"month(to_date(order_date, MM/dd/yyyy))"
CA-2016-152156,11/01/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-BO-10001798,Furniture,Bookcases,Bush Somerset Collection Bookcase,261.96,2,0.0,41.9136,11
CA-2016-152156,11/01/2016,11/11/2016,Second Class,CG-12520,Claire Gute,Consumer,United States,Henderson,Kentucky,42420,South,FUR-CH-10000454,Furniture,Chairs,"Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back",731.94,3,0.0,219.582,11
CA-2016-138688,01/12/2016,01/16/2016,Second Class,DV-13045,Darrin Van Huff,Corporate,United States,Los Angeles,California,90036,West,OFF-LA-10000240,Office Supplies,Labels,Self-Adhesive Address Labels for Typewriters by Universal,14.62,2,0.0,6.8714,1


In [0]:
from pyspark.sql.functions import to_date, date_format

df2 = df_insert.withColumn('order_date', to_date(col('order_date'), format='MM/dd/yyyy'))\
                        .withColumn('ship_date', to_date(col('ship_date'), format='MM/dd/yyyy'))
    # .limit(10).display()

In [0]:
from pyspark.sql.functions import *

df2.select(year(col('order_date'))).limit(5).display()

In [0]:
df2.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- order_date: date (nullable = true)
 |-- ship_date: date (nullable = true)
 |-- ship_mode: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- segment: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- postal_code: long (nullable = true)
 |-- region: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- sales: double (nullable = true)
 |-- quantity: long (nullable = true)
 |-- discount: double (nullable = true)
 |-- profit: double (nullable = true)



In [0]:
# check year month day format
df2.select('order_date').where(day(col('order_date'))>=13).limit(5).display()

order_date
2017-01-15
2015-11-22
2015-11-22
2014-01-13
2014-01-27


In [0]:
# Lagacy content, don't run. You can run but everything will be the same. So don't waste your compute resource
df_date_format = df2.withColumn('order_date', date_format(col('order_date'), 'yyyy-MM-dd'))\
    .withColumn('ship_date', date_format(col('ship_date'), 'yyyy-MM-dd'))

In [0]:
# make another 2 column for partitioning
from pyspark.sql.functions import year, month

df_write = df_date_format.withColumn('year', year(col('order_date')))\
                        .withColumn('month', month(col('order_date')))

In [0]:
# check for data quality. This is not real-world data so it should be good quality.
df_write.select('*').where((col('order_id').isNull()) | (col('customer_id').isNull()) | (col('postal_code').isNull())\
            | (col('product_id').isNull())).display()

order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales,quantity,discount,profit,year,month


In [0]:
# You should setup your AWS and Databricks before run this command
spark.sql(f"CREATE TABLE IF NOT EXISTS rawfile_csv.sss.silver(\
    order_id STRING NOT NULL,\
    ship_date DATE,\
    ship_mode STRING,\
    customer_id STRING,\
    customer_name STRING,\
    segment STRING,\
    country STRING,\
    city STRING,\
    state STRING,\
    postal_code LONG,\
    region STRING,\
    product_id STRING,\
    category VARCHAR(200),\
    sub_category VARCHAR(200),\
    product_name STRING,\
    sales DOUBLE,\
    quantity LONG,\
    discount DOUBLE,\
    profit DOUBLE)\
    PARTITIONED BY (order_date DATE)\
    STORED AS PARQUET\
    LOCATION '{S3_OUTPUT_PATH}'\
    ")

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-7151363212929843>, line 1[0m
[0;32m----> 1[0m spark[38;5;241m.[39msql([38;5;124mf[39m[38;5;124m"[39m[38;5;124mCREATE TABLE IF NOT EXISTS rawfile_csv.sss.silver([39m[38;5;130;01m\[39;00m
[1;32m      2[0m [38;5;124m    order_id STRING NOT NULL,[39m[38;5;130;01m\[39;00m
[1;32m      3[0m [38;5;124m    ship_date DATE,[39m[38;5;130;01m\[39;00m
[1;32m      4[0m [38;5;124m    ship_mode STRING,[39m[38;5;130;01m\[39;00m
[1;32m      5[0m [38;5;124m    customer_id STRING,[39m[38;5;130;01m\[39;00m
[1;32m      6[0m [38;5;124m    customer_name STRING,[39m[38;5;130;01m\[39;00m
[1;32m      7[0m [38;5;124m    segment STRING,[39m[38;5;130;01m\[39;00m
[1;32m      8[0m [38;5;124m    country STRING,[39m[38;5;130;01m\[39;00m
[1;32m      9[0m [38;5;124m    city

In [0]:
df_write.write.partitionBy(col('year'), col('month')).insertInto('rawfile_csv.sss.silver')

### The commands below are for write-only, used for practice
In the real world scenario, we typically create table and insert like the above. Then, our data will be queryable. 
  
But if you have a problem with your **Databricks on AWS** and still want to learn about Medallion, I suggest you just use the command below then read the parquet and make it a view to make it queryable. Keep in my that, this is not a good practice, just for practice for understanding the concept.

In [0]:
spark.sql('create volume if not exists path.to.volume')

DataFrame[]

In [0]:
df_write.write.parquet('/Volumes/rawfile_csv/sss/silver/', mode='overwrite', partitionBy=['year', 'month'])

In [0]:
# try our writing partition

try_df = spark.read.parquet('/Volumes/rawfile_csv/sss/silver/')
try_df.select('*').where((col('year')==2014) & (col('month')==1)).limit(20).display()

order_id,order_date,ship_date,ship_mode,customer_id,customer_name,segment,country,city,state,postal_code,region,product_id,category,sub_category,product_name,sales,quantity,discount,profit,year,month
US-2014-150126,2014-01-27,2014-01-01,Standard Class,AS-10045,Aaron Smayling,Corporate,United States,New York City,New York,10035,East,OFF-PA-10002709,Office Supplies,Paper,Xerox 1956,65.78,11,0.0,32.2322,2014,1
US-2014-121734,2014-01-11,2014-01-16,Standard Class,SE-20110,Sanjit Engle,Consumer,United States,Lewiston,Idaho,83501,West,OFF-BI-10004817,Office Supplies,Binders,GBC Personal VeloBind Strips,9.584,1,0.2,3.3544,2014,1
CA-2014-169460,2014-01-19,2014-01-21,Second Class,NF-18595,Nicole Fjeld,Home Office,United States,San Jose,California,95123,West,FUR-FU-10004017,Furniture,Furnishings,"""Executive Impressions 13"""" Chairman Wall Clock""",76.14,3,0.0,26.649,2014,1
CA-2014-110219,2014-01-01,2014-01-01,First Class,EB-13870,Emily Burns,Consumer,United States,San Antonio,Texas,78207,Central,FUR-CH-10001146,Furniture,Chairs,"Global Value Mid-Back Manager's Chair, Gray",127.869,3,0.3,-9.13350000000001,2014,1
US-2014-165862,2014-01-13,2014-01-17,Standard Class,GK-14620,Grace Kelly,Corporate,United States,Los Angeles,California,90049,West,FUR-TA-10002855,Furniture,Tables,Bevis Round Conference Table Top & Single Column Base,351.216,3,0.2,4.39019999999998,2014,1
CA-2014-109491,2014-01-20,2014-01-26,Standard Class,LC-16930,Linda Cazamias,Corporate,United States,Richmond,Indiana,47374,Central,FUR-FU-10000221,Furniture,Furnishings,"Master Caster Door Stop, Brown",20.32,4,0.0,6.9088,2014,1
CA-2014-109491,2014-01-20,2014-01-26,Standard Class,LC-16930,Linda Cazamias,Corporate,United States,Richmond,Indiana,47374,Central,TEC-AC-10001284,Technology,Accessories,Enermax Briskie RF Wireless Keyboard and Mouse Combo,62.31,3,0.0,22.4316,2014,1
CA-2014-116757,2014-01-30,2014-01-01,Standard Class,MS-17980,Michael Stewart,Corporate,United States,Houston,Texas,77095,Central,OFF-FA-10002815,Office Supplies,Fasteners,Staples,21.312,6,0.2,7.1928,2014,1
CA-2014-116757,2014-01-30,2014-01-01,Standard Class,MS-17980,Michael Stewart,Corporate,United States,Houston,Texas,77095,Central,OFF-PA-10002005,Office Supplies,Paper,Xerox 225,25.92,5,0.2,9.072,2014,1
CA-2014-147235,2014-01-24,2014-01-28,Standard Class,CD-11920,Carlos Daly,Consumer,United States,New York City,New York,10024,East,OFF-PA-10004948,Office Supplies,Paper,Xerox 190,24.9,5,0.0,11.703,2014,1


### Before we leave
This is the end of bronze-to-silver notebook. I think it will be better if we use notebook for bronze to silver and silver to gold separately. See you in the next notebook (sss_silver_gold.ipynb)