In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pyspark.sql.functions as fn
from pyspark.conf import SparkConf

In [3]:
sc = SparkSession.builder\
    .config("spark.jars", "/home/sayyor/postgresql-42.6.0.jar")\
    .config("spark.driver.extraClassPath", "/home/sayyor/postgresql-42.6.0.jar")\
    .config("spark.executor.extraClassPath", "/home/sayyor/postgresql-42.6.0.jar")\
    .getOrCreate()
sqlContext = SQLContext(sc)



In [None]:
users.drop("Unnamed: 0").write.format("jdbc")\
    .option("url", "jdbc:postgresql://localhost:5432/bdm_joint")\
    .option("driver", "org.postgresql.Driver").option("dbtable", "users")\
    .option("user", "bdm").option("password", "test123")\
    .option("driver", "org.postgresql.Driver").save()

In [42]:
users = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/users.parquet", header='true', inferSchema='true')

In [43]:
addresses = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/addresses.parquet", header='true', inferSchema='true')

In [44]:
result = users.join(addresses, users.user_id == addresses.id).drop(
        "Unnamed: 0", "id", "firstname", "lastname", "mobile", "streetNumber",
        "buildingNumber", "postalCode", "provence", "country", "streetName"
    )

In [45]:
result = result.withColumn("city", fn.when(fn.col("city") == 'Palma de Mallorca', 'Palma').otherwise(fn.col("city")))

In [24]:
# Change city names for all appropriate tables
city_dist = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/cities_distances.parquet", header='true', inferSchema='true')

In [25]:
city_dist = city_dist.withColumn("name1", fn.when(fn.col("name1") == 'Alacant / Alicante', 'Alacant').otherwise(fn.col("name1")))
city_dist = city_dist.withColumn("name2", fn.when(fn.col("name2") == 'Alacant / Alicante', 'Alacant').otherwise(fn.col("name2")))

In [26]:
city_dist = city_dist.drop("latitude1", "longitude1", "latitude2", "longitude2")

In [55]:
flights = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/scheduledFlights.parquet", header='true', inferSchema='true')

In [57]:
travels = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/travels.parquet", header='true', inferSchema='true')

In [12]:
city_dist.select('name2').distinct().show()

+---------+
|    name2|
+---------+
|   Málaga|
|Barcelona|
|  Alacant|
|    Palma|
|  Sevilla|
+---------+



In [81]:
travels = travels.withColumn("departureAirportFsCode",
                    fn.when(fn.col("departureAirportFsCode") == 'MAD', 'Madrid')
                    .otherwise(fn.when(fn.col("departureAirportFsCode") == 'BCN', 'Barcelona')
                    .otherwise(fn.when(fn.col("departureAirportFsCode") == 'PMI', 'Palma')
                    .otherwise(fn.when(fn.col("departureAirportFsCode") == 'AGP', 'Málaga')
                    .otherwise(fn.col("departureAirportFsCode"))))))

travels = travels.withColumn("arrivalAirportFsCode",
                    fn.when(fn.col("arrivalAirportFsCode") == 'MAD', 'Madrid')
                    .otherwise(fn.when(fn.col("arrivalAirportFsCode") == 'BCN', 'Barcelona')
                    .otherwise(fn.when(fn.col("arrivalAirportFsCode") == 'PMI', 'Palma')
                    .otherwise(fn.when(fn.col("arrivalAirportFsCode") == 'AGP', 'Málaga')
                    .otherwise(fn.col("arrivalAirportFsCode"))))))

In [13]:
products = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/products.parquet", header='true', inferSchema='true')

In [16]:
products = products.drop("product_image_link")

In [21]:
gas = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/gas.parquet", header='true', inferSchema='true')

In [28]:
city_dist = city_dist.join(gas, city_dist.country1 == gas.country)

In [32]:
city_dist = city_dist.withColumn("lpg_price", city_dist["lpg"]*(city_dist['distance_km']/100)*8)

In [34]:
city_dist = city_dist.withColumn("diesel_price", city_dist["diesel"]*(city_dist['distance_km']/100)*5)
city_dist = city_dist.withColumn("gasoline_price", city_dist["gasoline"]*(city_dist['distance_km']/100)*6.3)

In [37]:
city_dist = city_dist.drop("country", "gasoline", "diesel", "lpg")

In [59]:
requests = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/requests.parquet", header='true', inferSchema='true')

In [None]:
requests.drop("description")

In [83]:
dhl_price = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/DHL_Price.parquet", header='true', inferSchema='true')
dhl_zone = sqlContext.read.format("parquet").load("hdfs://localhost:9900/input/DHL_Zone.parquet", header='true', inferSchema='true')
dhl_zone = dhl_zone.withColumn("Region_start", fn.when(fn.col("Region_start") == 'Palma De Mallorca (PMI)', 'Palma').otherwise(fn.col("Region_start")))
dhl_zone = dhl_zone.withColumn("Region_end", fn.when(fn.col("Region_end") == 'Palma De Mallorca (PMI)', 'Palma').otherwise(fn.col("Region_end")))

In [61]:
requests.show()

+--------------------+----------------+-----------+--------------------+-------------------+-------------------+-------------------+-------------+-----------------+-----------+-----------+----------+-------+-----------+
|initializationUserId|collectionUserId|travellerId|           productId|      dateToDeliver|      dateDelivered|        requestDate|pickUpAddress|collectionAddress|description|deliveryFee|pickupCity|address|collectCity|
+--------------------+----------------+-----------+--------------------+-------------------+-------------------+-------------------+-------------+-----------------+-----------+-----------+----------+-------+-----------+
|                  20|              25|       null|dc582e9ac5036846a...|04/20/2023 05:32 PM|               null|04/10/2023 05:32 PM|           20|               25|       null|       null|    Madrid|     25|     Málaga|
|                  20|              37|        0.0|bfd9a39f1774d4ca8...|05/03/2023 01:05 AM|04/26/2023 03:44 AM|04/23/20

In [47]:
result = result.drop("gender", "nationality", "dob", "is_traveller", "user_id")

In [60]:
requests = requests.join(result, result.address == requests.pickUpAddress)
requests = requests.withColumnRenamed("city", 'pickupCity').drop("address")
requests = requests.join(result, result.address == requests.collectionAddress)
requests = requests.withColumnRenamed("city", 'collectCity')

In [84]:
dhl_zone_new = dhl_zone.withColumn("Region_start", fn.when(fn.col("Region_start") == "Resto de España (ES)", "Málaga").otherwise(fn.col("Region_start")))
dhl_zone_new = dhl_zone_new.union(dhl_zone.withColumn("Region_start", fn.when(fn.col("Region_start") == "Resto de España (ES)", "Barcelona").otherwise(fn.col("Region_start"))))
dhl_zone_new = dhl_zone_new.union(dhl_zone.withColumn("Region_start", fn.when(fn.col("Region_start") == "Resto de España (ES)", "Madrid").otherwise(fn.col("Region_start"))))

dhl_zone = dhl_zone_new.withColumn("Region_end", fn.when(fn.col("Region_end") == "Resto de España (ES)", "Málaga").otherwise(fn.col("Region_end")))
dhl_zone = dhl_zone.union(dhl_zone_new.withColumn("Region_end", fn.when(fn.col("Region_end") == "Resto de España (ES)", "Barcelona").otherwise(fn.col("Region_end"))))
dhl_zone = dhl_zone.union(dhl_zone_new.withColumn("Region_end", fn.when(fn.col("Region_end") == "Resto de España (ES)", "Madrid").otherwise(fn.col("Region_end"))))

In [92]:
dhl_zone = dhl_zone.drop("Zone_start", "Zone_end")

In [94]:
requests = requests.join(dhl_zone, (dhl_zone.Region_start == requests.collectCity) & (dhl_zone.Region_end == requests.pickupCity))


In [103]:
requests = requests.drop("Region_start", "Region_end", "address")

In [99]:
requests = requests.join(products, products.product_id == requests.productId)

DataFrame[initializationUserId: bigint, collectionUserId: bigint, travellerId: double, productId: string, dateToDeliver: string, dateDelivered: string, requestDate: string, pickUpAddress: bigint, collectionAddress: bigint, description: double, deliveryFee: double, pickupCity: string, address: bigint, collectCity: string, Region_start: string, Region_end: string, Type: string, product_weight_g: double]

In [None]:
requests = requests.join(dhl_price, dhl_price.product_id == requests.productId)

In [100]:
requests = requests.drop("product_id")

In [115]:
row_idx = fn.ceil(requests['product_weight_g'])

In [112]:
row_idx.show()

TypeError: 'Column' object is not callable

In [113]:
requests = requests.withColumn("DHL_price", dhl_price.collect()[row_idx][requests.Type])

TypeError: list indices must be integers or slices, not Column

In [104]:
requests.show()

+--------------------+----------------+-----------+--------------------+-------------------+-------------------+-------------------+-------------+-----------------+-----------+-----------+----------+-----------+----+----------------+
|initializationUserId|collectionUserId|travellerId|           productId|      dateToDeliver|      dateDelivered|        requestDate|pickUpAddress|collectionAddress|description|deliveryFee|pickupCity|collectCity|Type|product_weight_g|
+--------------------+----------------+-----------+--------------------+-------------------+-------------------+-------------------+-------------+-----------------+-----------+-----------+----------+-----------+----+----------------+
|                  20|              25|       null|dc582e9ac5036846a...|04/20/2023 05:32 PM|               null|04/10/2023 05:32 PM|           20|               25|       null|       null|    Madrid|     Málaga|   A|          1300.0|
|                  20|              37|        0.0|bfd9a39f1774d

In [97]:
products = products.drop("product_length_cm", "product_height_cm", "product_width_cm", "product_category_name_english", "product_name")

In [107]:
dhl_price.collect()[0]['A']

16.78

In [None]:
dhl_price.collect

In [105]:
dhl_price.show()

+------+-----+-----+-----+------+
|Weight|    A|    B|    C|     D|
+------+-----+-----+-----+------+
|     1|16.78|17.05|20.23|  40.1|
|     2|16.78|17.05|20.23| 58.07|
|     3|18.43|18.86| 25.0| 66.03|
|     4|20.08|20.67|29.77| 73.99|
|     5|21.73|22.48|34.54| 81.95|
|     6|25.86|26.94|37.97| 89.91|
|     7|28.15|29.55| 41.4| 97.88|
|     8|30.44|32.16|44.83|105.85|
|     9|32.73|34.77|48.26|113.82|
|    10|35.02|37.38|51.69|121.79|
|    11|37.31|39.99| 54.7|129.75|
|    12| 39.6| 42.6|57.71|137.71|
|    13|41.89|45.21|60.72|145.67|
|    14|44.18|47.82|63.73|153.63|
|    15|46.47|50.43|66.74|161.59|
|    16|48.76|53.04|69.75|169.55|
|    17|51.05|55.65|72.76|177.51|
|    18|53.34|58.26|75.77|185.47|
|    19|55.63|60.87|78.78|193.43|
|    20|57.92|63.48|81.79|201.39|
+------+-----+-----+-----+------+
only showing top 20 rows



In [None]:
products = products.drop("product_weight_g")

In [1]:
# convert from csv to parquet
import pandas as pd
# folder = "../additional_sources/"
folder = "../datasets/"
files = ["DHL_Processed_Domestic_Price.parquet", ""]
for file in files:
    df = pd.read_csv(folder+file)
    df.to_parquet(folder+file.split(".")[0]+'.parquet')