### Description of datasets

channels: This dataset contains information about the sales channels (marketplaces) where our store owners sell their goods and food.

deliveries: This dataset contains information about the deliveries made by our partner delivery drivers.

drivers: This dataset contains information about the partner delivery drivers. They are stationed at our hubs, and every time an order is processed, they make the deliveries to the consumers' homes.

hubs: This dataset contains information about the Delivery Center hubs. Hubs are the distribution centers for 
orders, and deliveries originate from there.

orders: This dataset contains information about the sales processed through the Delivery Center platform.

payments: This dataset contains information about the payments made to the Delivery Center.

stores: This dataset contains information about the store owners. They use the Delivery Center platform to sell their items (goods and/or food) on the marketplaces.### 

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

In [3]:
spark = SparkSession.builder.appName("hdfs_test").getOrCreate()

24/04/23 11:10:44 WARN Utils: Your hostname, MacBook-Air-Paulina.local resolves to a loopback address: 127.0.0.1; using 192.168.33.5 instead (on interface en0)
24/04/23 11:10:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/23 11:10:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
files = ['channels', 'deliveries', 'drivers', 'hubs', 'orders', 'payments', 'stores']
dfs = {}

for file in files:
    dfs[file] = spark.read.csv("hdfs://localhost:9000/user/maciek/"+file+".csv", header=True, inferSchema=True)

                                                                                

In [5]:
channels = dfs['channels']
channels.show()

+----------+--------------+------------+
|channel_id|  channel_name|channel_type|
+----------+--------------+------------+
|         1|   OTHER PLACE| OWN CHANNEL|
|         2|   PHONE PLACE| OWN CHANNEL|
|         3|   WHATS PLACE| OWN CHANNEL|
|         4|    FACE PLACE| OWN CHANNEL|
|         5|    FOOD PLACE| MARKETPLACE|
|         6|   STORE PLACE| OWN CHANNEL|
|         7|  BERLIN PLACE| OWN CHANNEL|
|         8|  MADRID PLACE| OWN CHANNEL|
|         9|   THINK PLACE| OWN CHANNEL|
|        10|  LISBON PLACE| OWN CHANNEL|
|        11|   SUPER PLACE| OWN CHANNEL|
|        12|     ALL PLACE| MARKETPLACE|
|        13|VELOCITY PLACE| MARKETPLACE|
|        15|    EATS PLACE| MARKETPLACE|
|        17|   SHOPP PLACE| MARKETPLACE|
|        20|  MUNICH PLACE| MARKETPLACE|
|        21|  LONDON PLACE| MARKETPLACE|
|        23|  ATCHIN PLACE| MARKETPLACE|
|        24|    FULL PLACE| MARKETPLACE|
|        25|      ON PLACE| MARKETPLACE|
+----------+--------------+------------+
only showing top

In [6]:
from pyspark.sql import functions as F


print(f"channel_name number: {channels.select('channel_name').distinct().count()}\n")

### unique values occurance 
grouped_data = channels.groupBy('channel_name').agg(F.count('*').alias('count'))
print(f"unique values in channel name column: {grouped_data.count()}\n")

### null values number for each value in channel name
nul_vals_cn = channels.groupBy('channel_name').agg(F.sum(F.col('channel_name').isNull().cast("int")).alias("null_count")).collect()
for row in nul_vals_cn:
    print(f"{row['channel_name']} null values: {row['null_count']}")

### total null values number in each column
tot_nan_vals = channels.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in channels.columns)).collect()[0].asDict()
print(f"\nnumber of unique values in each column:\n {tot_nan_vals}")


channel_name number: 40

unique values in channel name column: 40

FOOD PLACE null values: 0
FACE PLACE null values: 0
LONGO PLACE null values: 0
CENTER PLACE null values: 0
ALL PLACE null values: 0
RONALD PLACE null values: 0
FULL PLACE null values: 0
PORTO PLACE null values: 0
SUPER PLACE null values: 0
VELOCITY PLACE null values: 0
AHORA PLACE null values: 0
SEARCH PLACE null values: 0
GLUB PLACE null values: 0
CAICAI PLACE null values: 0
OTHER PLACE null values: 0
BERLIN PLACE null values: 0
BEATLES PLACE null values: 0
ON PLACE null values: 0
SPEED PLACE null values: 0
LONDON PLACE null values: 0
WEAR PLACE null values: 0
SAN PLACE null values: 0
LISBON PLACE null values: 0
SHOPP PLACE null values: 0
RIBA PLACE null values: 0
STORE PLACE null values: 0
MUNICH PLACE null values: 0
REGISTER PLACE null values: 0
OWN PLACE null values: 0
MADRID PLACE null values: 0
THINK PLACE null values: 0
EATS PLACE null values: 0
ATCHIN PLACE null values: 0
OFF PLACE null values: 0
BRAZIL PLACE nu

In [7]:
deliveries = dfs['deliveries']
print(deliveries.describe().show())
deliveries.show()

24/04/23 11:11:14 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+-----------------+-------------------+------------------+------------------------+---------------+
|summary|      delivery_id|  delivery_order_id|         driver_id|delivery_distance_meters|delivery_status|
+-------+-----------------+-------------------+------------------+------------------------+---------------+
|  count|           378843|             378843|            362957|                  378770|         378843|
|   mean| 2634216.36283896|8.220422339218093E7| 21001.77764308169|      10721.360044354093|           NULL|
| stddev|264655.3209144291|  6996909.141038616|15520.652243004235|       231539.1894589949|           NULL|
|    min|          2174658|           68409030|               133|                       0|      CANCELLED|
|    max|          3144739|           93139817|             66459|                 7251291|     DELIVERING|
+-------+-----------------+-------------------+------------------+------------------------+---------------+

None
+-----------+---------

In [8]:
print(f"delivery status distinct number: {deliveries.select('delivery_status').distinct().count()}\n")

### unique values occurance 
grouped_data = deliveries.groupBy('delivery_status').agg(F.count('*').alias('count'))
print(f"unique values in delivery status column: {grouped_data.count()}\n")

### total null values number in each column
tot_nan_vals = deliveries.select(*(F.sum(F.col(c).isNull().cast("int")).alias(c) for c in deliveries.columns)).collect()[0].asDict()
print(type(tot_nan_vals))
for key, val in tot_nan_vals.items():
   print(f"number of null values in  column {key}: {val}\n")




CodeCache: size=131072Kb used=34065Kb max_used=34209Kb free=97006Kb
 bounds [0x00000001061e8000, 0x0000000108398000, 0x000000010e1e8000]
 total_blobs=12602 nmethods=11598 adapters=914
 compilation: disabled (not enough contiguous free space left)
delivery status distinct number: 3

unique values in delivery status column: 3

<class 'dict'>
number of null values in  column delivery_id: 0

number of null values in  column delivery_order_id: 0

number of null values in  column driver_id: 15886

number of null values in  column delivery_distance_meters: 73

number of null values in  column delivery_status: 0



In [10]:
hubs = dfs['hubs']
print(hubs.describe().show())
hubs.show(32)

+-------+----------------+---------------+---------+---------+-----------------+-----------------+
|summary|          hub_id|       hub_name| hub_city|hub_state|     hub_latitude|    hub_longitude|
+-------+----------------+---------------+---------+---------+-----------------+-----------------+
|  count|              32|             32|       32|       32|               32|               32|
|   mean|        34.40625|           NULL|     NULL|     NULL|-24.4207382740625|   -46.5114598125|
| stddev|24.9035032838043|           NULL|     NULL|     NULL|2.289950688834545|2.668164650441109|
|    min|               2|AVENUE SHOPPING| CURITIBA|       PR|       -30.085743|       -51.245997|
|    max|              91|  WOLF SHOPPING|S�O PAULO|       SP|      -22.8858199|      -43.1821807|
+-------+----------------+---------------+---------+---------+-----------------+-----------------+

None
+------+----------------+--------------+---------+------------+-------------+
|hub_id|        hub_name|

In [11]:
drivers = dfs['drivers']
print(drivers.describe().show())
drivers.show()

+-------+------------------+------------+-----------------+
|summary|         driver_id|driver_modal|      driver_type|
+-------+------------------+------------+-----------------+
|  count|              4824|        4824|             4824|
|   mean|29681.790008291875|        NULL|             NULL|
| stddev| 18391.79806919864|        NULL|             NULL|
|    min|               133|       BIKER|        FREELANCE|
|    max|             66494|     MOTOBOY|LOGISTIC OPERATOR|
+-------+------------------+------------+-----------------+

None
+---------+------------+-----------------+
|driver_id|driver_modal|      driver_type|
+---------+------------+-----------------+
|      133|     MOTOBOY|LOGISTIC OPERATOR|
|      138|     MOTOBOY|        FREELANCE|
|      140|     MOTOBOY|        FREELANCE|
|      143|       BIKER|        FREELANCE|
|      148|     MOTOBOY|        FREELANCE|
|      165|     MOTOBOY|        FREELANCE|
|      172|     MOTOBOY|        FREELANCE|
|      174|       BIKER|

In [12]:
d_modal_vls = drivers.select('driver_modal').distinct().count()
print(f"unique values in driver modal column: {d_modal_vls}")

unique values in driver modal column: 2


In [17]:
orders = dfs['orders']
#print(orders.describe().show())
print(len(orders.columns))
print(orders.columns)

29
['order_id', 'store_id', 'channel_id', 'payment_order_id', 'delivery_order_id', 'order_status', 'order_amount', 'order_delivery_fee', 'order_delivery_cost', 'order_created_hour', 'order_created_minute', 'order_created_day', 'order_created_month', 'order_created_year', 'order_moment_created', 'order_moment_accepted', 'order_moment_ready', 'order_moment_collected', 'order_moment_in_expedition', 'order_moment_delivering', 'order_moment_delivered', 'order_moment_finished', 'order_metric_collected_time', 'order_metric_paused_time', 'order_metric_production_time', 'order_metric_walking_time', 'order_metric_expediton_speed_time', 'order_metric_transit_time', 'order_metric_cycle_time']


In [19]:
payments = dfs['payments']
print(payments.describe().show())
payments.show()

+-------+--------------+
|summary|payment_status|
+-------+--------------+
|  count|        400834|
|   mean|          NULL|
| stddev|          NULL|
|    min|      AWAITING|
|    max|          PAID|
+-------+--------------+

None
+----------+----------------+--------------+-----------+--------------+--------------+
|payment_id|payment_order_id|payment_amount|payment_fee|payment_method|payment_status|
+----------+----------------+--------------+-----------+--------------+--------------+
|   4427917|        68410055|        118.44|        0.0|       VOUCHER|          PAID|
|   4427918|        68410055|        394.81|        7.9|        ONLINE|          PAID|
|   4427941|        68412721|        206.95|       5.59|        ONLINE|          PAID|
|   4427948|        68413340|          58.8|       1.59|        ONLINE|          PAID|
|   4427955|        68414018|          45.8|       0.92|        ONLINE|          PAID|
|   4427956|        68414309|         106.8|       2.88|        ONLINE|  

In [25]:
status_un = payments.select('payment_status').distinct().count()
print(status_un)
vls_occ = payments.groupBy('payment_status').agg(F.sum(F.col('payment_status').isNull().cast("int")).alias('null_vls')).collect()
for row in vls_occ:
    print(f"number of null values in {row['payment_status']} column: {row['null_vls']}")

3
number of null values in CHARGEBACK column: 0
number of null values in PAID column: 0
number of null values in AWAITING column: 0


In [26]:
stores = dfs['stores']
print(stores.describe().show())
stores.show()

+-------+------------------+------------------+--------------+-------------+-----------------+------------------+------------------+
|summary|          store_id|            hub_id|    store_name|store_segment| store_plan_price|    store_latitude|   store_longitude|
+-------+------------------+------------------+--------------+-------------+-----------------+------------------+------------------+
|  count|               951|               951|           951|          951|              836|               935|               935|
|   mean|2080.5552050473184| 30.37539432176656|          NULL|         NULL| 34.8165311004786|-23.88641021835297|-46.04286869650257|
| stddev|1338.5905906279252|18.961983691118437|          NULL|         NULL|14.30408567487769|1.5583846446157017|2.2980003371948903|
|    min|                 3|                 2|ACA PRISSURAU |         FOOD|              0.0|        -30.085743|        -51.245997|
|    max|              4679|                91|�RPIR PILMACI |       

In [28]:
shops = stores.groupBy('store_segment').agg(F.count(
print(shops)

[Row(store_segment='FOOD', shops_num=None), Row(store_segment='GOOD', shops_num=None)]


In [10]:
spark.stop()

ConnectionRefusedError: [Errno 61] Connection refused