In [17]:
import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
spark = SparkSession.builder \
    .appName("SampleSparkCode") \
    .getOrCreate()

In [18]:
df_address = spark.read.csv("..\data\Address.csv",header=True)
df_customer = spark.read.csv("..\data\Customer.csv",header=True)
df_customer_address = spark.read.csv("..\data\CustomerAddress.csv",header=True)

In [19]:
df_dim_customer_address = df_customer.alias("c").join(
    df_customer_address.alias("ca"), col("c.CustomerID") ==  col("ca.CustomerID")) \
        .join(df_address.alias("a"),  col("ca.AddressID") ==  col("a.AddressID")) \
        .select(
     col("c.CustomerID").alias("CustomerID"),
     col("ca.AddressID").alias("AddressID"),
    "c.FirstName", "c.LastName", "c.CompanyName", "c.EmailAddress", "c.Phone",
    "ca.AddressType", "a.AddressLine1", "a.AddressLine2", "a.City", "a.StateProvince",
    "a.CountryRegion", "a.PostalCode"
        )

In [20]:
df_dim_customer_address.show(5)

+----------+---------+---------+-----------+--------------------+--------------------+------------+-----------+-----------------+------------+-----------+-------------+-------------+----------+
|CustomerID|AddressID|FirstName|   LastName|         CompanyName|        EmailAddress|       Phone|AddressType|     AddressLine1|AddressLine2|       City|StateProvince|CountryRegion|PostalCode|
+----------+---------+---------+-----------+--------------------+--------------------+------------+-----------+-----------------+------------+-----------+-------------+-------------+----------+
|     29485|     1086|Catherine|       Abel|Professional Sale...|catherine0@advent...|747-555-0171|Main Office|57251 Serene Blvd|        NULL|   Van Nuys|   California|United States|     91411|
|     29486|      621|      Kim|Abercrombie|      Riders Company|kim2@adventure-wo...|334-555-0137|Main Office|   Tanger Factory|        NULL|     Branch|    Minnesota|United States|     55056|
|     29489|     1069|  France

In [39]:
spark.sql("drop table if exists dim_customer_address")

DataFrame[]

- Partition by State
- Parition by can have both save and saveAsTable. In Save, it can take any relative path and save the files there

In [40]:
df_dim_customer_address.write.format("parquet") \
    .partitionBy("StateProvince") \
    .option("path","..\processed\example_partition_by\dimcustomer") \
    .saveAsTable("dim_customer_address")  

- Bucket By State( Note that Bucket does not allow save(). It allows only save as table)
- Remember that bucketBy does not take relative path. It points to the warehouse directory - typically where spark is running. This is where Spark's default metastore resides

In [41]:
spark.sql("drop table if exists dim_customer_address")

DataFrame[]

In [42]:
df_dim_customer_address.write.format("parquet") \
    .bucketBy(10, "StateProvince") \
    .sortBy("StateProvince", "City") \
    .option("path", "..\processed\example_bucket_by\dimcustomer") \
    .mode("overwrite") \
    .saveAsTable("dim_customer_address")  
