# Calculating the distance between the Customer's city and the Seller's city

In [None]:
from pyspark.sql import SparkSession, functions as F
import math

In [None]:
spark = SparkSession.builder.getOrCreate()

In [None]:
orders_items_df = spark.read \
                 .option('escape', '\"') \
                 .option('quote', '\"') \
                 .csv('./dataset/olist_order_items_dataset.csv', header=True, multiLine=True, inferSchema=True)

orders_df = spark.read \
                 .option('escape', '\"') \
                 .option('quote', '\"') \
                 .csv('./dataset/olist_orders_dataset.csv', header=True, multiLine=True, inferSchema=True)

customers_df = spark.read \
                  .option('escape', '\"') \
                  .option('quote', '\"') \
                  .csv('./dataset/olist_customers_dataset.csv', header=True, multiLine=True, inferSchema=True)

sellers_df = spark.read \
                  .option('escape', '\"') \
                  .option('quote', '\"') \
                  .csv('./dataset/olist_sellers_dataset.csv', header=True, multiLine=True, inferSchema=True)

geo_df = spark.read \
                  .option('escape', '\"') \
                  .option('quote', '\"') \
                  .csv('./dataset/olist_geolocation_dataset.csv', header=True, multiLine=True, inferSchema=True)

# Grouping data

In [3]:
data_df = orders_df.filter(F.col('order_status') == 'delivered').join(customers_df, 'customer_id')

data_df = orders_items_df.join(data_df, 'order_id') \
                         .join(sellers_df, 'seller_id') \
                         .select('customer_state', 'customer_city', 'customer_zip_code_prefix', 'seller_state', 'seller_city', 'seller_zip_code_prefix', 'freight_value')

data_df = data_df.join(geo_df, data_df.customer_zip_code_prefix == geo_df.geolocation_zip_code_prefix) \
                 .select(F.col('geolocation_lat').alias('customer_lat'), F.col('geolocation_lng').alias('customer_lng'), 'seller_state', 'seller_city', 'seller_zip_code_prefix', 'freight_value') \
                 .join(geo_df, data_df.seller_zip_code_prefix == geo_df.geolocation_zip_code_prefix) \
                 .select('customer_lat', 'customer_lng', F.col('geolocation_lat').alias('seller_lat'), F.col('geolocation_lng').alias('seller_lng'),'freight_value')

# Calculating distance

In [None]:
def d(c_lat, c_lng, s_lat, s_lng):
    print(c_lat)
    print(c_lng)
    print(s_lat)
    print(s_lng)
    radius = 6371 # km

    dlat = math.radians(s_lat-c_lat)
    dlon = math.radians(s_lng-c_lng)
    a = math.sin(dlat/2) * math.sin(dlat/2) + math.cos(math.radians(c_lat)) \
        * math.cos(math.radians(s_lat)) * math.sin(dlon/2) * math.sin(dlon/2)
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a))
    d = radius * c

    return d

distance = F.udf(d)

data_df = data_df.withColumn('distance', \
                  distance('customer_lat', 'customer_lng', 'seller_lat', 'seller_lng'))
# data_df.limit(100).toPandas()
data_df.groupBy().agg(F.max(F.col('distance'))).show()