# Combine physical store returns data and export Parquet file

### 1) Create a Spark session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .appName("Physical Store Returns Processing")\
  .master("yarn")\
  .enableHiveSupport()\
  .getOrCreate()

In [2]:
spark

### 2) Load store returns data from a CSV file into a DataFrame

In [3]:
# Read data from CSV files in Google Cloud Storage and populate a DataFrame
stores_dataframe = spark.read.csv("gs://sureskills-lab-dev/DAC2M4L1/store_returns_input/store_returns_*.csv", header=True, inferSchema=True)


                                                                                

In [4]:
# Print the DataFrame schema to review data types and field names
stores_dataframe.printSchema()

root
 |-- order_id: integer (nullable = true)
 |-- rma_id: integer (nullable = true)
 |-- return_status: string (nullable = true)
 |-- status_date: string (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- quantity_returned: integer (nullable = true)
 |-- store_id: integer (nullable = true)
 |-- row_number: integer (nullable = true)



In [5]:
display(stores_dataframe.show(3,truncate=False))

+--------+------+-------------+-----------------------+----------+-----------------+--------+----------+
|order_id|rma_id|return_status|status_date            |product_id|quantity_returned|store_id|row_number|
+--------+------+-------------+-----------------------+----------+-----------------+--------+----------+
|44176   |39613 |in progress  |null                   |25059     |1                |20000   |1060      |
|16558   |1440  |complete     |2020-03-18 13:46:00 UTC|8875      |1                |20000   |2920      |
|29503   |34936 |complete     |null                   |3883      |1                |20000   |8241      |
+--------+------+-------------+-----------------------+----------+-----------------+--------+----------+
only showing top 3 rows



None

In [6]:
# Create a temporary view to reference the DataFrame in Spark SQL
# Note the name of the view: store_returns
stores_dataframe.createOrReplaceTempView("store_returns")

In [7]:
# Display the store_returns data using Spark SQL and the temporary view store_returns created in the prior step
# Note that the temporary view is referenced as a table in the SELECT statement
spark.sql("select * from store_returns").show()

+--------+------+-------------+--------------------+----------+-----------------+--------+----------+
|order_id|rma_id|return_status|         status_date|product_id|quantity_returned|store_id|row_number|
+--------+------+-------------+--------------------+----------+-----------------+--------+----------+
|   44176| 39613|  in progress|                null|     25059|                1|   20000|      1060|
|   16558|  1440|     complete|2020-03-18 13:46:...|      8875|                1|   20000|      2920|
|   29503| 34936|     complete|                null|      3883|                1|   20000|      8241|
|   89574| 58184|  in progress|2023-02-17 05:34:...|     28858|                1|   20000|      5684|
|   10304| 33793|      unknown|2023-01-25 15:29:...|     17923|                1|   20000|      7872|
|   49339| 46723|     complete|2023-04-25 04:29:...|     27420|                1|   20000|      3497|
|   12543| 53788|      unknown|                null|      9418|                1| 

### 3) Load store names from a CSV file

In [8]:
# Read data from CSV files in Google Cloud Storage (GCS) and populate a DataFrame
stores_dataframe = spark.read.csv("gs://sureskills-lab-dev/DAC2M4L1/store_returns_input/stores_*.csv", header=True, inferSchema=True)


In [9]:
# Print schema
stores_dataframe.printSchema()

root
 |-- store_id: integer (nullable = true)
 |-- street_address: string (nullable = true)



In [10]:
stores_dataframe.createOrReplaceTempView("stores")

In [11]:
spark.sql("select * from stores").show()

+--------+--------------------+
|store_id|      street_address|
+--------+--------------------+
|   20000|000 Amber Viaduct...|
|   20001|000 Antonio Mountain|
|   20002|   000 Barnes Estate|
|   20003|  000 Brenda Landing|
|   20004|000 Cheryl Inlet ...|
|   20005|000 Cynthia Alley...|
|   20006|     000 Dale Bridge|
|   20007|      000 Dana Shore|
|   20008|    000 Daniel Plain|
|   20009|000 Darrell Mount...|
|   20010|   000 Dylan Mission|
|   20011|  000 Fisher Village|
|   20012|000 Frances Strav...|
|   20013|000 Holland Sprin...|
|   20014|000 Janet Via Sui...|
|   20015|000 Jasmine Harbo...|
|   20016|000 Jeffery Under...|
|   20017|000 Jeffrey Walk ...|
|   20018|    000 Johnson Flat|
|   20019|000 Kennedy Villa...|
+--------+--------------------+
only showing top 20 rows



### 4) Add street address to returns data and export it as Parquet file

In [12]:
# The Spark SQL query joins the store returns data with store information data, populates a new DataFrame and displays it 
dataframe_to_export = spark.sql('''
                                    select  
                                      sr.order_id, 
                                      sr.rma_id,  
                                      sr.return_status,         
                                      sr.status_date,  
                                      sr.product_id,  
                                      sr.quantity_returned,  
                                      sr.store_id,  
                                      s.street_address  
                                    from store_returns sr  
                                    inner join stores as s  
                                    on sr.store_id = s.store_id    
                                ''')
dataframe_to_export.show()

+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|order_id|rma_id|return_status|         status_date|product_id|quantity_returned|store_id|      street_address|
+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|   44176| 39613|  in progress|                null|     25059|                1|   20000|000 Amber Viaduct...|
|   16558|  1440|     complete|2020-03-18 13:46:...|      8875|                1|   20000|000 Amber Viaduct...|
|   29503| 34936|     complete|                null|      3883|                1|   20000|000 Amber Viaduct...|
|   89574| 58184|  in progress|2023-02-17 05:34:...|     28858|                1|   20000|000 Amber Viaduct...|
|   10304| 33793|      unknown|2023-01-25 15:29:...|     17923|                1|   20000|000 Amber Viaduct...|
|   49339| 46723|     complete|2023-04-25 04:29:...|     27420|                1|   20000|000 Amber Viad

In [13]:
# Find the Google Cloud Project ID.
PROJECT_ID=!gcloud info --format='value(config.project)'

# Export dataframe as a Parquet File - overwrite the file if it exists
# Note that a long line is broken off using the backlash (\)
# The Google Cloud Storage bucket name in this lab is the name of the project id
# We are using the project_id value obtained in the prior step in the gs:// path
dataframe_to_export.write\
  .mode("overwrite")\
  .save("gs://" + PROJECT_ID[0] + "/store_returns_output/store_returns.parquet")

                                                                                

#### 4.2. Modify a query

In [14]:
# Modify the query below so that the field name displayed is store_address instead of street_address
dataframe_to_export = spark.sql('''
                                    select  
                                      sr.order_id, 
                                      sr.rma_id,  
                                      sr.return_status,         
                                      sr.status_date,  
                                      sr.product_id,  
                                      sr.quantity_returned,  
                                      sr.store_id,  
                                      s.street_address 
                                    from store_returns sr  
                                    inner join stores as s  
                                    on sr.store_id = s.store_id    
                                ''')
dataframe_to_export.show()

# Hint: the answer is in the next cell

+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|order_id|rma_id|return_status|         status_date|product_id|quantity_returned|store_id|      street_address|
+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|   44176| 39613|  in progress|                null|     25059|                1|   20000|000 Amber Viaduct...|
|   16558|  1440|     complete|2020-03-18 13:46:...|      8875|                1|   20000|000 Amber Viaduct...|
|   29503| 34936|     complete|                null|      3883|                1|   20000|000 Amber Viaduct...|
|   89574| 58184|  in progress|2023-02-17 05:34:...|     28858|                1|   20000|000 Amber Viaduct...|
|   10304| 33793|      unknown|2023-01-25 15:29:...|     17923|                1|   20000|000 Amber Viaduct...|
|   49339| 46723|     complete|2023-04-25 04:29:...|     27420|                1|   20000|000 Amber Viad

In [15]:
# This query modifies the field name to display store_address instead of street_address
dataframe_to_export = spark.sql('''
                                    select  
                                      sr.order_id, 
                                      sr.rma_id,  
                                      sr.return_status,         
                                      sr.status_date,  
                                      sr.product_id,  
                                      sr.quantity_returned,  
                                      sr.store_id,  
                                      s.street_address as store_address
                                    from store_returns sr  
                                    inner join stores as s  
                                    on sr.store_id = s.store_id    
                                ''')
dataframe_to_export.show()

+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|order_id|rma_id|return_status|         status_date|product_id|quantity_returned|store_id|       store_address|
+--------+------+-------------+--------------------+----------+-----------------+--------+--------------------+
|   44176| 39613|  in progress|                null|     25059|                1|   20000|000 Amber Viaduct...|
|   16558|  1440|     complete|2020-03-18 13:46:...|      8875|                1|   20000|000 Amber Viaduct...|
|   29503| 34936|     complete|                null|      3883|                1|   20000|000 Amber Viaduct...|
|   89574| 58184|  in progress|2023-02-17 05:34:...|     28858|                1|   20000|000 Amber Viaduct...|
|   10304| 33793|      unknown|2023-01-25 15:29:...|     17923|                1|   20000|000 Amber Viaduct...|
|   49339| 46723|     complete|2023-04-25 04:29:...|     27420|                1|   20000|000 Amber Viad