In [65]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
import gdown
import os
from pyspark.sql.functions import *

In [None]:
credentials_location = '/home/datatalks_jan/.google/credentials/google_credentials.json'

In [None]:
conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('conrad_test') \
    .set("spark.jars", "./lib/gcs-connector-hadoop3-2.2.5.jar") \
    .set("spark.hadoop.google.cloud.auth.service.account.enable", "true") \
    .set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", credentials_location)

In [None]:
sc = SparkContext(conf=conf)
hadoop_conf = sc._jsc.hadoopConfiguration()

In [4]:
source_url = 'https://drive.google.com/uc?id=1vcb_HBWsOSKW4XxhLfRpGlLzBLwHlGWJ'
output_path = 'source_data/multi_source_data'
output_file = os.path.join(output_path, 'multi_source_demo.zip') 

gdown.download(source_url, output_file, quiet=False)

Downloading...
From (original): https://drive.google.com/uc?id=1vcb_HBWsOSKW4XxhLfRpGlLzBLwHlGWJ
From (redirected): https://drive.google.com/uc?id=1vcb_HBWsOSKW4XxhLfRpGlLzBLwHlGWJ&confirm=t&uuid=089e7409-dbfd-43d6-ae68-bdcdc16b18c5
To: /home/datatalks_jan/Data_Eden/8_pySpark_pilot/source_data/multi_source_data/multi_source_demo.zip
100%|█████████████████████████████████████████████████████████████| 154M/154M [00:00<00:00, 229MB/s]


'source_data/multi_source_data/multi_source_demo.zip'

In [5]:
os.system(f'unzip -o {output_file} -d {output_path}')

Archive:  source_data/multi_source_data/multi_source_demo.zip
  inflating: source_data/multi_source_data/distribution_centers.csv  
  inflating: source_data/multi_source_data/events.csv  
  inflating: source_data/multi_source_data/inventory_items.csv  
  inflating: source_data/multi_source_data/order_items.csv  
  inflating: source_data/multi_source_data/orders.csv  
  inflating: source_data/multi_source_data/products.csv  
  inflating: source_data/multi_source_data/readme.txt  
  inflating: source_data/multi_source_data/users.csv  


0

In [6]:
spark = SparkSession.builder \
    .config(conf=sc.getConf()) \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 10:57:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [50]:
df_1_users = spark.read.option("header", "true").csv(f'{output_path}/users.csv')
df_1_distri_centers = spark.read.option("header", "true").csv(f'{output_path}/distribution_centers.csv')

                                                                                

In [52]:
df_1_users = df_1_users.withColumn("latitude", df_1_users["latitude"].cast("float"))
df_1_users = df_1_users.withColumn("longitude", df_1_users["longitude"].cast("float"))
df_1_distri_centers = df_1_distri_centers.withColumn("latitude", df_1_distri_centers["latitude"].cast("float"))
df_1_distri_centers = df_1_distri_centers.withColumn("longitude", df_1_distri_centers["longitude"].cast("float"))

In [67]:
df_1_distri_centers.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- latitude: float (nullable = true)
 |-- longitude: float (nullable = true)



In [110]:
from pyspark.sql.functions import col, acos, cos, sin, radians, atan2, sqrt

In [111]:
def haversine_distance(lat1, lon1, lat2, lon2):
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlon = lon2 - lon1
    dlat = lat2 - lat1
    a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
    c = 2 * atan2(sqrt(a), sqrt(1 - a))
    distance = 6371 * c 
    return distance

In [135]:
df_1_users.createOrReplaceTempView("users")
df_1_distri_centers.createOrReplaceTempView("distribution_centers")

In [176]:
df_1_sql_query = """
    SELECT u.id as user_id, u.age as user_age, u.country, u.state,
           dc.id AS distribution_center_id,dc.name AS distribution_center_name,
           u.latitude as user_latitude, u.longitude as user_longitude, dc.latitude as center_latitude, dc.longitude as center_longitude
    FROM users u
    CROSS JOIN distribution_centers dc
"""

In [146]:
df_1_with_distance = spark.sql(df_1_sql_query).withColumn("distance", haversine_distance(col("user_latitude"), col("user_longitude"), col("center_latitude"), col("center_longitude")))
df_1_with_distance.show()



+-------+--------+-------+---------+----------------------+------------------------+-------------+--------------+---------------+----------------+------------------+
|user_id|user_age|country|    state|distribution_center_id|distribution_center_name|user_latitude|user_longitude|center_latitude|center_longitude|          distance|
+-------+--------+-------+---------+----------------------+------------------------+-------------+--------------+---------------+----------------+------------------+
|      1|      70|  China|   Shanxi|                     1|              Memphis TN|    36.147415|      113.1227|        35.1174|        -89.9711|11737.842558916114|
|      1|      70|  China|   Shanxi|                     2|              Chicago IL|    36.147415|      113.1227|        41.8369|        -87.6847|11089.054948665944|
|      1|      70|  China|   Shanxi|                     3|              Houston TX|    36.147415|      113.1227|        29.7604|        -95.3698|12104.974024220499|
|   

                                                                                

In [150]:
from pyspark.sql.window import Window
windowSpec = Window.partitionBy("user_id").orderBy("distance")
df_with_row_number = df_1_with_distance.withColumn("row_number", row_number().over(windowSpec))
df_1_final = df_with_row_number.filter(col("row_number") == 1)
df_1 = df_1_final.select("user_id","user_age","country","state","distribution_center_id","distribution_center_name","distance").orderBy("user_id")
df_1.show()
df_1.printSchema()

[Stage 108:>                                                        (0 + 2) / 2]

+-------+--------+--------------+--------------------+----------------------+------------------------+------------------+
|user_id|user_age|       country|               state|distribution_center_id|distribution_center_name|          distance|
+-------+--------+--------------+--------------------+----------------------+------------------------+------------------+
|      1|      70|         China|              Shanxi|                     4|          Los Angeles CA|10564.978611447585|
|     10|      18|         China|           Guangdong|                     4|          Los Angeles CA|11683.981878339073|
|    100|      32|         China|           Guangdong|                     4|          Los Angeles CA|11433.559166566682|
|   1000|      49|         China|           Guangdong|                     4|          Los Angeles CA|11628.109441907302|
|  10000|      38|         China|           Guangdong|                     4|          Los Angeles CA|11375.162419660232|
| 100000|      34|      

[Stage 111:>                                                        (0 + 2) / 2]                                                                                

In [151]:
df_2_orders = spark.read.option("header", "true").csv(f'{output_path}/orders.csv')
df_2_order_items = spark.read.option("header", "true").csv(f'{output_path}/order_items.csv')

In [173]:
df_2_orders_last_year = df_2_orders.withColumn("created_at", df_2_orders["created_at"].cast("timestamp")).filter(year("created_at") == 2023)
df_2_orders_last_year.orderBy(df_2_orders_last_year["created_at"]).show(100)



+--------+-------+----------+------+-------------------+--------------------+--------------------+--------------------+-----------+
|order_id|user_id|    status|gender|         created_at|         returned_at|          shipped_at|        delivered_at|num_of_item|
+--------+-------+----------+------+-------------------+--------------------+--------------------+--------------------+-----------+
|  108218|  86249|  Complete|     M|2023-01-01 00:00:00|                null|2023-01-03 10:19:...|2023-01-05 00:57:...|          1|
|   89472|  71296|  Complete|     M|2023-01-01 00:02:00|                null|2023-01-03 09:01:...|2023-01-03 19:03:...|          1|
|  117415|  93606|  Returned|     M|2023-01-01 00:16:00|2023-01-06 20:42:...|2023-01-01 16:31:...|2023-01-05 12:12:...|          1|
|  105710|  84267|   Shipped|     F|2023-01-01 00:19:00|                null|2023-01-02 02:19:...|                null|          1|
|   67811|  54111|  Complete|     F|2023-01-01 00:20:00|                null

                                                                                

In [162]:
df_2_orders_last_year.createOrReplaceTempView("orders_last_year")
df_2_order_items.createOrReplaceTempView("order_items")

In [180]:
df_2_sql = spark.sql("""
    SELECT t2.order_id, t1.user_id, t1.product_id, t1.status, t1.sale_price
    FROM  order_items as t1
    INNER JOIN orders_last_year as t2 on t2.order_id = t1.order_id
""")

In [181]:
df_2_sql.show()

                                                                                

+--------+-------+----------+----------+----------+
|order_id|user_id|product_id|    status|sale_price|
+--------+-------+----------+----------+----------+
|    7401|   5955|     13606|   Shipped|       2.5|
|   15156|  12140|     13606|   Shipped|       2.5|
|  112131|  89401|     13606|  Returned|       2.5|
|  109741|  87498|     13696|   Shipped|       3.5|
|   40873|  32792|     13696|  Complete|       3.5|
|   45837|  36739|     28679|  Complete|       3.5|
|   51252|  41004|     28679|  Complete|       3.5|
|   82249|  65559|     28668|  Complete|       3.5|
|   44958|  36034|     15917|  Returned|       3.5|
|   50940|  40764|     14248|  Returned|       3.5|
|   74499|  59387|     28668|  Returned|       3.5|
|    2159|   1733|     15784| Cancelled|       3.5|
|  102081|  81377|     15419| Cancelled|       3.5|
|   20438|  16354|     15917|Processing|       3.5|
|   86795|  69129|     12602| Cancelled|      3.75|
|   58472|  46791|     12667|   Shipped|         4|
|   87586|  

In [182]:
df_2_sql.createOrReplaceTempView("order_details")

In [191]:
df_2 = spark.sql("""
    SELECT user_id, COUNT(CASE WHEN status = 'Returned' THEN product_id END) / COUNT(product_id) as product_return_rate
    FROM  order_details
    GROUP BY user_id
""")
df_2.show()



+-------+-------------------+
|user_id|product_return_rate|
+-------+-------------------+
|  55371| 0.3333333333333333|
|  76321|                0.0|
|  67728|                0.0|
|  83450|                0.0|
|  50254|                0.0|
|   5645|                0.0|
|  36067|                0.0|
|  26005| 0.6666666666666666|
|  49079|                1.0|
|  16974|                0.0|
|  75602|                0.0|
|  37774|                0.0|
|  51550|                0.0|
|  52991|                0.0|
|  42776|                0.0|
|  45141|                0.0|
|  90022|                0.0|
|  29573|                0.0|
|  65548|                0.0|
|  67074|                0.0|
+-------+-------------------+
only showing top 20 rows



                                                                                

In [184]:
df_3_product_price = spark.read.option("header", "true").csv(f'{output_path}/products.csv')

In [185]:
df_3_product_price = df_3_product_price.select("id", "cost")

In [187]:
df_3_product_price.createOrReplaceTempView("product_price")

In [192]:
df_3 = spark.sql("""
    SELECT user_id, SUM(T1.sale_price - T2.cost) as profit_total
    FROM  order_details AS T1
    LEFT JOIN product_price AS T2 ON T2.id = T1.product_id
    GROUP BY user_id
""")

df_3.show()



+-------+------------------+
|user_id|      profit_total|
+-------+------------------+
|  55371| 36.56049993814668|
|  76321|3.6499999836087236|
|  67728| 6.160000022500753|
|  83450|34.115999944508076|
|  50254| 7.504000009968877|
|   5645| 80.19799986854196|
|  36067| 45.91799999959767|
|  26005| 58.43153086270138|
|  49079| 76.41999986022711|
|  16974|164.59368116655358|
|  75602|245.21341720238505|
|  37774| 9.614000018686056|
|  51550|104.69988313152537|
|  52991|50.305400310753285|
|  42776|11.711999997496603|
|  45141| 30.59799999045208|
|  90022|24.426070789695085|
|  29573| 50.58567986480862|
|  65548|13.607999987900255|
|  67074| 13.74799995869398|
+-------+------------------+
only showing top 20 rows



                                                                                

In [196]:
df_final = df_1.join(df_2, "user_id").join(df_3, "user_id")
df_final = df_final.select("user_id","user_age","country","state","distribution_center_id","distribution_center_name","product_return_rate","profit_total")

df_final = df_final.withColumn("user_age", df_final["user_age"].cast("int"))

df_final.show()
df_final.printSchema()

                                                                                

+-------+--------+-------------+--------------------+----------------------+------------------------+-------------------+------------------+
|user_id|user_age|      country|               state|distribution_center_id|distribution_center_name|product_return_rate|      profit_total|
+-------+--------+-------------+--------------------+----------------------+------------------------+-------------------+------------------+
|  10003|      34|        China|Inner Mongolia Au...|                     4|          Los Angeles CA|                0.0|15.735300476637484|
|  10009|      38|       Brasil|               Bahia|                     9|           Charleston SC|                0.0|175.31354011801167|
|  10023|      68|        China|           Guangdong|                     4|          Los Angeles CA|                0.0|21.074899063354728|
|  10039|      34|      Belgium|            Flanders|                     6|    Port Authority of...|                0.0|24.584000065922737|
|  10047|    

[Stage 242:>                                                        (0 + 1) / 1]                                                                                

In [None]:
final_df.write.csv("gs://my-zhe-414813/conrad_output", mode="overwrite")