### Starting reading the sql file and executing

In [1]:

import os
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import sqlite3
spark = SparkSession.builder.getOrCreate()
conn = sqlite3.connect('hubs.db')
cur = conn.cursor()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/26 10:33:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/26 10:33:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
#Mounting a dinamic path
init_path = os.getcwd()
file_name = '/hubs_events.sql'
final_path = init_path + file_name

with open(final_path, 'r') as sql_file:
    query = conn.executescript(sql_file.read())

result = cur.execute("SELECT * FROM MY_TABLE").fetchall()
cur.execute("DROP TABLE MY_TABLE")
conn.close()
#calling timestamp as timestamp_datetime to avoid ambiguous columns when we exploid the data column
df_spark = spark.createDataFrame(result).toDF('id','timestamp_datetime','name','data')


In [3]:
df_spark.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+
|                  id|  timestamp_datetime|                name|                data|
+--------------------+--------------------+--------------------+--------------------+
|api-XJQFMJdJnp6E8...|2017-01-04T20:05:...|order/execute/cus...|{"event": "order/...|
|04998e62-44de-480...|2017-03-11T18:10:...|order/execute/cus...|{"event": "order/...|
|fa356ead-ec8f-49e...|2017-03-25T03:08:...|order/execute/cus...|{"event": "order/...|
|f05040d0-d75c-43d...|2017-03-16T15:13:...|order/execute/cus...|{"event": "order/...|
|96729095-1923-422...|2017-03-15T16:26:...| node/review/created|{"event": "node/r...|
|aa7e25fd-b7d9-47b...|2017-02-21T13:12:...|order/execute/cus...|{"event": "order/...|
|b56e3a72-04e9-464...|2017-03-16T12:09:...|order/execute/cus...|{"event": "order/...|
|f0c0b5f6-b2fa-4c3...|2017-03-04T17:28:...|order/execute/cus...|{"event": "order/...|
|api-Jw0tXl91iPTg7...|2017-01-27T19:28:...|order/execu

                                                                                

###After get all data into a spark dataframe is time to exploid the data column

In [4]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType, FloatType, DoubleType
#from pyspark.sql.functions import col,from_json
from pyspark.sql.functions import *

schema = StructType([
  StructField("event",StringType(),True), 
  StructField("hub_id",StringType(),True), 
  StructField("order_id",StringType(),True), 
  StructField("timestamp", TimestampType(), True),
  StructField("price_customer",StringType(),True),
  StructField("orderStationType",StringType(),True),
  StructField("orderStationModel",StringType(),True),
  StructField("context_traits_uid",StringType(),True),
  StructField("review_value_speed",StringType(),True),
  StructField("review_value_service",StringType(),True),
  StructField("context_traits_persona",StringType(),True),
  StructField("orderStationManufacterer",StringType(),True),
  StructField("review_value_communication",StringType(),True),
  StructField("review_value_print_quality",StringType(),True)
])
#using from_json to exploid data column
fulldf = df_spark.withColumn('data', from_json(col('data'),schema)).select('id','timestamp_datetime','name','data.*')
fulldf.show()


+--------------------+--------------------+--------------------+--------------------+------+--------+--------------------+--------------+----------------+--------------------+------------------+------------------+--------------------+----------------------+------------------------+--------------------------+--------------------------+
|                  id|  timestamp_datetime|                name|               event|hub_id|order_id|           timestamp|price_customer|orderStationType|   orderStationModel|context_traits_uid|review_value_speed|review_value_service|context_traits_persona|orderStationManufacterer|review_value_communication|review_value_print_quality|
+--------------------+--------------------+--------------------+--------------------+------+--------+--------------------+--------------+----------------+--------------------+------------------+------------------+--------------------+----------------------+------------------------+--------------------------+-----------------

### I used some spark sql to get only columns that will be worth keep to do the metrics calculations

In [5]:
#creating a tempView to work with sparkSql
fulldf.createOrReplaceTempView('fulldf_table')


## Selecting only the columns we need to the solution

### Creating a new dataframe with only columns that will be used to calculate metrics

In [6]:
df_sample = spark.sql('''
select 
id as log_id,
context_traits_uid as customer_id,
hub_id as supplier_id,
order_id,
timestamp as date,
event,
review_value_speed,
review_value_print_quality
from fulldf_table
''')
           
                      
df_sample.show()

+--------------------+-----------+-----------+--------+--------------------+--------------------+------------------+--------------------------+
|              log_id|customer_id|supplier_id|order_id|                date|               event|review_value_speed|review_value_print_quality|
+--------------------+-----------+-----------+--------+--------------------+--------------------+------------------+--------------------------+
|api-XJQFMJdJnp6E8...|     196042|      84241| 4031796| 2017-01-04 17:05:07|order/execute/cus...|              null|                      null|
|04998e62-44de-480...|      98562|      69607| 4473029|2017-03-11 15:10:...|order/execute/cus...|              null|                      null|
|fa356ead-ec8f-49e...|     192358|       4450| 4560418|2017-03-25 00:08:...|order/execute/cus...|              null|                      null|
|f05040d0-d75c-43d...|     192358|       4450| 4498287|2017-03-16 12:13:...|order/execute/cus...|              null|                    

###Improving the dataset before start

In [7]:
#in this step i'm formating the datetime and replacing some null values from review_scores to better calculate in the future
df_sample = df_sample.withColumn("date",date_format(col("date"),"yyyy-MM-dd HH:mm:ss"))
df_sample = df_sample.withColumn("calculated_at",date_format(col("date"),"yyyy-MM-dd"))
#df_sample=df_sample.na.fill({'review_value_speed':0, 'review_value_print_quality':0})

display(df_sample)

DataFrame[log_id: string, customer_id: string, supplier_id: string, order_id: string, date: string, event: string, review_value_speed: string, review_value_print_quality: string, calculated_at: string]

# Calculating the Metrics

## Calculating the average_rating

In [8]:
#Assumptions:
#First of all I'm presuming that the average_rating will be calculated as average regarding (review_value_speed + review_value_print_quality)2. In another words the average will be a number regarding both review values
#Another assumption I made:
# The average_rating is calculated by the last value given by a customer at the end of the day. That means if the first score was 80, but the customer update the score to 100, i'm going to considere this one. Evaluating by the last datetime. This also means that the customer delete his score as the last event of the day, this review will no longer be considered to calculate the average. I'm also not considering null values to calculate the final average.

In [9]:
#first selecting only rows that bring us reviews information
df_sample.createOrReplaceTempView('df_sample_table')
df_reviews = spark.sql('SELECT * FROM df_sample_table WHERE event LIKE "node/review%" ')
df_reviews.show()

+--------------------+-----------+-----------+--------+-------------------+-------------------+------------------+--------------------------+-------------+
|              log_id|customer_id|supplier_id|order_id|               date|              event|review_value_speed|review_value_print_quality|calculated_at|
+--------------------+-----------+-----------+--------+-------------------+-------------------+------------------+--------------------------+-------------+
|96729095-1923-422...|     213926|      62909| 4484946|2017-03-15 13:26:50|node/review/created|               100|                       100|   2017-03-15|
|6e14f9c2-a7f6-4b9...|     142364|      30352| 4291151|2017-03-29 14:41:46|node/review/created|               100|                       100|   2017-03-29|
|ec3ce13b-a90b-499...|     192358|       4450| 4560418|2017-04-04 11:18:31|node/review/created|               100|                       100|   2017-04-04|
|cd495436-565d-43f...|     208998|      34872| 4354608|2017-02-2

In [10]:
df_reviews.createOrReplaceTempView('df_reviews_table')

In [11]:
#creating a dataframe with the last update from each order by day
df_lastUpdate = spark.sql('''

Select t.* 
FROM
(SELECT *, --calculated_at,supplier_id, order_id, customer_id,date,event,log_id, 
ROW_NUMBER() OVER (PARTITION BY calculated_at,supplier_id, order_id, customer_id ORDER BY date desc ) as SEQ
FROM df_reviews_table 
) t
where SEQ = 1
and event <> "node/review/deleted"
--and (review_value_speed is not null or review_value_print_quality is not null)



'''
)
df_lastUpdate.show()

+--------------------+-----------+-----------+--------+-------------------+-------------------+------------------+--------------------------+-------------+---+
|              log_id|customer_id|supplier_id|order_id|               date|              event|review_value_speed|review_value_print_quality|calculated_at|SEQ|
+--------------------+-----------+-----------+--------+-------------------+-------------------+------------------+--------------------------+-------------+---+
|api-nMV61di2FJV8y...|     194699|       4450| 4006543|2017-01-02 19:45:18|node/review/created|               100|                       100|   2017-01-02|  1|
|api-5FCnL6WpMcj4w...|     194699|       4450| 4015913|2017-01-02 20:20:39|node/review/created|               100|                       100|   2017-01-02|  1|
|api-LOyj19YiaA6BN...|     165139|     100130| 3451200|2017-01-03 07:38:57|node/review/created|               100|                        80|   2017-01-03|  1|
|api-ygFGCfGFHVa9S...|     174934|     1

In [12]:
df_lastUpdate.createOrReplaceTempView('df_lastUpdate_table')

In [13]:
df_average_rating = spark.sql('''

--Second select to calculate average between the two values calculated beforeavg(speed) and avg(print_quality)
Select t1.calculated_at
, t1.supplier_id
, "average_rating" as metric
, format_number((t1.m1 + t1.m2)/2, '##.##') as value from 

(
--Fist Select to calculate a average for each review(speed and print_quality)
Select calculated_at,supplier_id,AVG(review_value_speed) as m1 ,AVG(review_value_print_quality) as m2 from df_lastUpdate_table group by 1,2 order by calculated_at) 
as t1
where (t1.m1 + t1.m2)/2 is not null


''')
df_average_rating.show()

+-------------+-----------+--------------+-----+
|calculated_at|supplier_id|        metric|value|
+-------------+-----------+--------------+-----+
|   2017-01-02|       4450|average_rating|  100|
|   2017-01-03|     165195|average_rating|  100|
|   2017-01-03|       4450|average_rating|  100|
|   2017-01-03|      30352|average_rating|  100|
|   2017-01-03|      34872|average_rating|   90|
|   2017-01-03|      84241|average_rating|  100|
|   2017-01-03|     100130|average_rating|   90|
|   2017-01-04|      30352|average_rating|  100|
|   2017-01-05|      84241|average_rating|  100|
|   2017-01-05|      69607|average_rating|   90|
|   2017-01-06|      30352|average_rating|  100|
|   2017-01-06|      84241|average_rating|  100|
|   2017-01-09|       4450|average_rating|  100|
|   2017-01-09|      30352|average_rating|  100|
|   2017-01-09|      62909|average_rating|   90|
|   2017-01-10|       4450|average_rating|  100|
|   2017-01-11|      30352|average_rating|  100|
|   2017-01-11|     

In [14]:
#CREATING A TABLE WITH METRIC 1

## Calculating the acceptance_ratio

In [15]:
#Assumptions:I'm assuming that when the supplier has no order received, they will not be able to get a acceptance_ratio for that day. Also i'm pressuming that when a order was not accepted in the same day it was received, it will not count on the following days.

In [16]:
#Using the dataframe we got with all columns we need to this solution
df_sample.show()

+--------------------+-----------+-----------+--------+-------------------+--------------------+------------------+--------------------------+-------------+
|              log_id|customer_id|supplier_id|order_id|               date|               event|review_value_speed|review_value_print_quality|calculated_at|
+--------------------+-----------+-----------+--------+-------------------+--------------------+------------------+--------------------------+-------------+
|api-XJQFMJdJnp6E8...|     196042|      84241| 4031796|2017-01-04 17:05:07|order/execute/cus...|              null|                      null|   2017-01-04|
|04998e62-44de-480...|      98562|      69607| 4473029|2017-03-11 15:10:28|order/execute/cus...|              null|                      null|   2017-03-11|
|fa356ead-ec8f-49e...|     192358|       4450| 4560418|2017-03-25 00:08:54|order/execute/cus...|              null|                      null|   2017-03-25|
|f05040d0-d75c-43d...|     192358|       4450| 4498287|201

In [17]:
#creating a tempview to work using spark sql
df_sample.createOrReplaceTempView('df_sample_table')

In [18]:
#First lets put a flag to better calculate when a order got the status processing(it happens when the supplier got)
df_flagged_sample = spark.sql('''
select *, 
case when event like "order/execute/customer/status/processing" then 1
else 0
end as received_order,
case when event like "order/execute/customer/status/payment" then 1
else 0
end order_accepted
from df_sample_table where event like "%order%"
''')

df_flagged_sample.show()

+--------------------+-----------+-----------+--------+-------------------+--------------------+------------------+--------------------------+-------------+--------------+--------------+
|              log_id|customer_id|supplier_id|order_id|               date|               event|review_value_speed|review_value_print_quality|calculated_at|received_order|order_accepted|
+--------------------+-----------+-----------+--------+-------------------+--------------------+------------------+--------------------------+-------------+--------------+--------------+
|api-XJQFMJdJnp6E8...|     196042|      84241| 4031796|2017-01-04 17:05:07|order/execute/cus...|              null|                      null|   2017-01-04|             1|             0|
|04998e62-44de-480...|      98562|      69607| 4473029|2017-03-11 15:10:28|order/execute/cus...|              null|                      null|   2017-03-11|             1|             0|
|fa356ead-ec8f-49e...|     192358|       4450| 4560418|2017-03-25

In [19]:
# In order to be able to build the logic to accept only orders that were accepted the same day were received, I had to group by date(or calculated_at column), supplier and order. After all of this I was able to aggregate to build the final metric

In [20]:
#Now let's use the new dataframe to calculate the acceptance_ratio 
df_flagged_sample.createOrReplaceTempView("flagged_sample_table")

In [21]:
df_flagged_sample.createOrReplaceTempView("flagged_sample_table")

In [22]:
#First used a group by to consolidate all events until order_id, this way is easy to see if the order events
#Second subselect was used only to filter days that the supplier got orders, and to avoid orders that were accepted by a diferent day that was received

In [23]:
df_acceptance_ratio = spark.sql('''

select t2.calculated_at,
t2.supplier_id
--,SUM(t2.order_accepted_qtd) as qtd_accepted
--,SUM(t2.received_order_qtd) as qtd_received
,"acceptance_ratio" as metric
,format_number(SUM(t2.order_accepted_qtd)/SUM(t2.received_order_qtd),'##.##%') as value
from (

select t.* --This subselect was used only to filter days that the supplier got orders, and to avoid orders that were accepted by a diferent day that was received

from (

select calculated_at --First used a group by to consolidate all events until order_id, this way is easy to see if the order events
,supplier_id
,order_id
,sum(received_order) as received_order_qtd
,sum(order_accepted) as order_accepted_qtd 
from flagged_sample_table 
group by 1,2,3 
order by 1
) t

where received_order_qtd >= order_accepted_qtd
and received_order_qtd = 1
)t2

group by 1,2 order by 1,2


''')


df_acceptance_ratio.show()

+-------------+-----------+----------------+------+
|calculated_at|supplier_id|          metric| value|
+-------------+-----------+----------------+------+
|   2017-01-01|       4450|acceptance_ratio|    0%|
|   2017-01-01|      84241|acceptance_ratio|    0%|
|   2017-01-02|      18815|acceptance_ratio|  100%|
|   2017-01-02|      30352|acceptance_ratio|   50%|
|   2017-01-02|      34872|acceptance_ratio|    0%|
|   2017-01-02|       4450|acceptance_ratio|   75%|
|   2017-01-03|       4450|acceptance_ratio|66.67%|
|   2017-01-03|      62909|acceptance_ratio|    0%|
|   2017-01-03|      69607|acceptance_ratio|66.67%|
|   2017-01-03|      84241|acceptance_ratio|66.67%|
|   2017-01-04|      34872|acceptance_ratio|    0%|
|   2017-01-04|       4450|acceptance_ratio|   50%|
|   2017-01-04|      69607|acceptance_ratio|    0%|
|   2017-01-04|      84241|acceptance_ratio|   75%|
|   2017-01-05|      30352|acceptance_ratio|  100%|
|   2017-01-05|      34872|acceptance_ratio|  100%|
|   2017-01-

In [24]:
df_supplier_score_metrics = df_average_rating.union(df_acceptance_ratio)
df_supplier_score_metrics.show()

+-------------+-----------+--------------+-----+
|calculated_at|supplier_id|        metric|value|
+-------------+-----------+--------------+-----+
|   2017-01-02|       4450|average_rating|  100|
|   2017-01-03|      34872|average_rating|   90|
|   2017-01-03|       4450|average_rating|  100|
|   2017-01-03|      84241|average_rating|  100|
|   2017-01-03|     100130|average_rating|   90|
|   2017-01-03|     165195|average_rating|  100|
|   2017-01-03|      30352|average_rating|  100|
|   2017-01-04|      30352|average_rating|  100|
|   2017-01-05|      84241|average_rating|  100|
|   2017-01-05|      69607|average_rating|   90|
|   2017-01-06|      84241|average_rating|  100|
|   2017-01-06|      30352|average_rating|  100|
|   2017-01-09|      62909|average_rating|   90|
|   2017-01-09|       4450|average_rating|  100|
|   2017-01-09|      30352|average_rating|  100|
|   2017-01-10|       4450|average_rating|  100|
|   2017-01-11|      84241|average_rating|  100|
|   2017-01-11|     

In [25]:
df_supplier_score_metrics.write.mode("overwrite").saveAsTable("supplier_score_metrics")
spark.sql("select * from supplier_score_metrics ").show()

                                                                                

+-------------+-----------+----------------+------+
|calculated_at|supplier_id|          metric| value|
+-------------+-----------+----------------+------+
|   2017-01-01|       4450|acceptance_ratio|    0%|
|   2017-01-01|      84241|acceptance_ratio|    0%|
|   2017-01-02|      18815|acceptance_ratio|  100%|
|   2017-01-02|      30352|acceptance_ratio|   50%|
|   2017-01-02|      34872|acceptance_ratio|    0%|
|   2017-01-02|       4450|acceptance_ratio|   75%|
|   2017-01-03|       4450|acceptance_ratio|66.67%|
|   2017-01-03|      62909|acceptance_ratio|    0%|
|   2017-01-03|      69607|acceptance_ratio|66.67%|
|   2017-01-03|      84241|acceptance_ratio|66.67%|
|   2017-01-04|      34872|acceptance_ratio|    0%|
|   2017-01-04|       4450|acceptance_ratio|   50%|
|   2017-01-04|      69607|acceptance_ratio|    0%|
|   2017-01-04|      84241|acceptance_ratio|   75%|
|   2017-01-05|      30352|acceptance_ratio|  100%|
|   2017-01-05|      34872|acceptance_ratio|  100%|
|   2017-01-

In [26]:
spark.sql("show tables").show()

+---------+--------------------+-----------+
|namespace|           tableName|isTemporary|
+---------+--------------------+-----------+
|  default|supplier_score_me...|      false|
|         | df_lastupdate_table|       true|
|         |    df_reviews_table|       true|
|         |     df_sample_table|       true|
|         |flagged_sample_table|       true|
|         |        fulldf_table|       true|
+---------+--------------------+-----------+

