In [1]:
from datetime import datetime, date
import datetime as dt
import time
from pyspark.sql import DataFrame
import pandas as pd
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.functions import current_timestamp, from_unixtime
import random
import string

In [2]:
def timestamp_now():
    return round(datetime.utcnow().timestamp() * 1000)

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [4]:
def create_users(dataset, n_client=200):

    id_start = 0
    
    if dataset != None:
        id_start = dataset.agg({'id': 'max'}).collect()[0]['max(id)'] + 1
    rows = []
    
    for i in range(n_client):
        firstName = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        lastName = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        row = (id_start, firstName, lastName)
        rows.append(row)
        id_start += 1
    
    df = spark.createDataFrame(rows, schema='Id bigint, FirstName string, LastName string')
    
    if dataset != None:
        df = DataFrame.unionAll(dataset, df)
        
    return df

In [29]:
df = create_users(None, 1000000)

In [30]:
df.write.parquet('spark/client.parquet')

In [4]:
clients = spark.read.parquet('spark/client.parquet')

In [8]:
def create_drivers(dataset, n_client=200):
    id_start = 0
    
    if dataset != None:
        id_start = dataset.agg({'id': 'max'}).collect()[0]['max(id)'] + 1
    rows = []
    
    for i in range(n_client):
        firstName = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        lastName = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        car = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        row = (id_start, firstName, lastName, car)
        rows.append(row)
        id_start += 1
        
    df = spark.createDataFrame(rows, schema='Id bigint, FirstName string, LastName string, Car string')
    
    if dataset != None:
        df = DataFrame.unionAll(dataset, df)
        
    return df

In [24]:
df = create_drivers(None, 500000)

In [25]:
df.write.parquet('spark/driver.parquet')

In [5]:
drivers = spark.read.parquet('spark/driver.parquet')

In [6]:
london_address = spark.read.csv('spark/london-address.csv', header=True)

In [16]:
def create_orders(clients, drivers, london_address, orders, n_count=50):
    ids_clients = clients.select('Id').collect()
    ids_drivers = drivers.select('Id').collect()
    coordinates = london_address.select('oseast1m', 'osnrth1m').collect()
    grids = [{'oseast1m': int(i['oseast1m']),'osnrth1m': int(i['osnrth1m'])} for i in coordinates]
    
    id_start = 0
    
    if orders != None:
        id_start = orders.agg({'id': 'max'}).collect()[0]['max(id)'] + 1
    rows = []
    
    for i in range(n_count):
            
        client_id = random.choice(ids_clients)
        driver_id = random.choice(ids_drivers)
        ids_clients.remove(client_id)
        ids_drivers.remove(driver_id)
        price = random.randint(0, 1500)
        created = datetime.utcnow()
        start = random.choice(grids)
        end = random.choice(grids)
        
        while end['oseast1m'] == start['oseast1m'] and end['osnrth1m'] == start['osnrth1m']:
            end = random.choice(grids)
            
        row = (id_start, str(start), str(end), client_id['Id'], driver_id['Id'], price, False, created, created)

        rows.append(row)
        id_start += 1
        
    df = spark.createDataFrame(rows, schema='Id bigint, Start string, End string, ClientId bigint, DriverId bigint, Price bigint, Closed boolean, Created timestamp, Modified timestamp')
    
    if orders != None:
        df = DataFrame.unionAll(orders, df)
        
    return df

In [18]:
orders = create_orders(clients, drivers, london_address, None, n_count=250000)

In [20]:
orders.write.parquet('spark/orders.parquet')

In [7]:
orders = spark.read.parquet('spark/orders.parquet')

In [23]:
def simulate_movement(drivers, orders, london_address, movement, n_count_order=125000, n_count_worde=100000):
    ID = 'Id'
    DRIVER_ID = 'DriverId'
    POSITION = 'Position'
    START_TIME = 'StartTime'
    END_TIME = 'EndTime'
    BOOKED = 'Booked'
    orders_booked = spark.createDataFrame(orders.filter('Closed = False').collect()[:n_count_order])
    ids_drivers = drivers.select(drivers.Id).join(orders_booked, orders_booked.DriverId==drivers.Id, 'left').where('DriverId is null')
    ids_drivers = ids_drivers.select(drivers.Id).collect()[:n_count_worde]
    
    ids_drivers_booked = orders_booked.select('DriverId').collect()
    coordinates = london_address.select('oseast1m', 'osnrth1m').collect()
    grids = [str({'oseast1m': int(i['oseast1m']),'osnrth1m': int(i['osnrth1m'])}) for i in coordinates]
    
    id_start = 0
    
    if movement != None:
        id_start = movement.agg({'id': 'max'}).collect()[0]['max(id)'] + 1
    
    rows = []
    for i in ids_drivers_booked:
        driverId = i['DriverId']
        row = (id_start, driverId, None, None, None, True)
        rows.append(row)
        id_start += 1
        
    for i in ids_drivers:
        driverId = i['Id']
        row = (id_start, driverId, None, None, None, False)
        rows.append(row)
        id_start += 1
    
    df_movement = spark.createDataFrame(rows, schema='Id bigint, DriverId bigint, Position string, StartTime timestamp, EndTime timestamp, Booked boolean')
    rows = []
    n = n_count_order + n_count_worde
    delta = dt.timedelta(minutes=30)
    times = [{'start': datetime.utcnow(), 'end': datetime.utcnow() + delta} for i in range(n)]
    k = 0
    for i in range(100):
        candidates = random.choices(grids, k=n)
        
        for j in range(n_count_order):
            id_driver = ids_drivers_booked[j]['DriverId']
            position = candidates[j]
            order = orders_booked.filter(orders_booked.DriverId == id_driver).collect()[0]
            movement = df_movement.filter(df_movement.DriverId == id_driver).collect()[0]
            if (order['End'] == position):
                rows.append((movement[ID], movement[DRIVER_ID], position, times[j]['start'], times[j]['end'], False))
            else:
                rows.append((movement[ID], movement[DRIVER_ID], position, times[j]['start'], None, True))
            
            if i == 99:
                rows.append((movement[ID], movement[DRIVER_ID], position, times[j]['end'], times[j]['end'], False))
                
        for j in range(n_count_order, n):
            id_driver = ids_drivers[j-n_count_order]['Id']
            position = candidates[j]
            movement = df_movement.filter(df_movement.DriverId == id_driver).collect()[0]
            rows.append((movement[ID], movement[DRIVER_ID], position, movement[START_TIME], movement[END_TIME], movement[BOOKED]))
        
                    
        if k % 100 == 0:
            print(k)
            print(candidates)

        k += 1
    df = spark.createDataFrame(rows, schema='Id bigint, DriverId bigint, Position string, StartTime timestamp, EndTime timestamp, Booked boolean')
    return df, df_movement

In [65]:
df, df_movement = simulate_movement(drivers, orders_back, london_address, None, n_count_order=5, n_count_worde=5)

0
["{'oseast1m': 523167, 'osnrth1m': 189445}", "{'oseast1m': 531739, 'osnrth1m': 172489}", "{'oseast1m': 543760, 'osnrth1m': 186389}", "{'oseast1m': 530647, 'osnrth1m': 176164}", "{'oseast1m': 535483, 'osnrth1m': 195078}", "{'oseast1m': 533985, 'osnrth1m': 166601}", "{'oseast1m': 517054, 'osnrth1m': 177215}", "{'oseast1m': 521584, 'osnrth1m': 188555}", "{'oseast1m': 536713, 'osnrth1m': 178558}", "{'oseast1m': 526041, 'osnrth1m': 183910}"]


In [64]:
orders_back = spark.read.parquet('spark/orders_backup.parquet')

In [12]:
!ls ./spark

client.csv	driver.parquet		 movement.parquet
client.parquet	feedback.parquet	 orders_backup.parquet
data.zip	london-address.csv	 orders.parquet
driver.csv	movement_backup.parquet


In [78]:
movement_new = df.unionAll(df_movement)

In [79]:
closed = movement_new.filter('Booked==False and EndTime is not null').collect()

In [80]:
closed

[Row(Id=0, DriverId=196537, Position="{'oseast1m': 547219, 'osnrth1m': 172384}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26373), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26373), Booked=False),
 Row(Id=1, DriverId=48763, Position="{'oseast1m': 538250, 'osnrth1m': 165993}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26374), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26374), Booked=False),
 Row(Id=2, DriverId=335348, Position="{'oseast1m': 541967, 'osnrth1m': 185651}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), Booked=False),
 Row(Id=3, DriverId=109440, Position="{'oseast1m': 531534, 'osnrth1m': 182589}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), Booked=False),
 Row(Id=4, DriverId=120362, Position="{'oseast1m': 528487, 'osnrth1m': 164905}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 263

In [103]:
new_orders = orders_back.withColumn('Closed', F.when(F.col('DriverId').between(closed[0]['DriverId'],closed[0]['DriverId']),True).otherwise(F.col('Closed')))
new_orders = new_orders.withColumn('Modified', F.when(F.col('DriverId').between(closed[0]['DriverId'],closed[0]['DriverId']),datetime.utcnow()).otherwise(F.col('Modified')))

In [105]:
for i in range(1, len(closed)):
    new_orders = new_orders.withColumn('Closed', F.when(F.col('DriverId').between(closed[i]['DriverId'],closed[i]['DriverId']),True).otherwise(F.col('Closed')))
    new_orders = new_orders.withColumn('Modified', F.when(F.col('DriverId').between(closed[i]['DriverId'],closed[i]['DriverId']),datetime.utcnow()).otherwise(F.col('Modified')))

In [106]:
closed

[Row(Id=0, DriverId=196537, Position="{'oseast1m': 547219, 'osnrth1m': 172384}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26373), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26373), Booked=False),
 Row(Id=1, DriverId=48763, Position="{'oseast1m': 538250, 'osnrth1m': 165993}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26374), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26374), Booked=False),
 Row(Id=2, DriverId=335348, Position="{'oseast1m': 541967, 'osnrth1m': 185651}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), Booked=False),
 Row(Id=3, DriverId=109440, Position="{'oseast1m': 531534, 'osnrth1m': 182589}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), EndTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 26375), Booked=False),
 Row(Id=4, DriverId=120362, Position="{'oseast1m': 528487, 'osnrth1m': 164905}", StartTime=datetime.datetime(2021, 12, 16, 3, 58, 55, 263

In [107]:
new_orders.filter('Closed == True').show()

+------+--------------------+--------------------+--------+--------+-----+------+--------------------+--------------------+
|    Id|               Start|                 End|ClientId|DriverId|Price|Closed|             Created|            Modified|
+------+--------------------+--------------------+--------+--------+-----+------+--------------------+--------------------+
|187392|{'oseast1m': 5286...|{'oseast1m': 5336...|   85645|  196537|  673|  true|2021-12-15 21:22:...|2021-12-16 03:47:...|
|187393|{'oseast1m': 5389...|{'oseast1m': 5361...|  346039|   48763|  128|  true|2021-12-15 21:22:...|2021-12-16 03:47:...|
|187394|{'oseast1m': 5316...|{'oseast1m': 5165...|  914349|  335348| 1279|  true|2021-12-15 21:22:...|2021-12-16 03:47:...|
|187395|{'oseast1m': 5325...|{'oseast1m': 5246...|  895537|  109440| 1155|  true|2021-12-15 21:22:...|2021-12-16 03:47:...|
|187396|{'oseast1m': 5197...|{'oseast1m': 5251...|  895033|  120362|  921|  true|2021-12-15 21:22:...|2021-12-16 03:47:...|
+------+

In [108]:
new_orders.write.parquet('spark/orders.parquet', mode='overwrite')

In [109]:
movement_new.show()

+---+--------+--------------------+--------------------+-------+------+
| Id|DriverId|            Position|           StartTime|EndTime|Booked|
+---+--------+--------------------+--------------------+-------+------+
|  0|  196537|{'oseast1m': 5231...|2021-12-16 03:28:...|   null|  true|
|  1|   48763|{'oseast1m': 5317...|2021-12-16 03:28:...|   null|  true|
|  2|  335348|{'oseast1m': 5437...|2021-12-16 03:28:...|   null|  true|
|  3|  109440|{'oseast1m': 5306...|2021-12-16 03:28:...|   null|  true|
|  4|  120362|{'oseast1m': 5354...|2021-12-16 03:28:...|   null|  true|
|  5|       0|{'oseast1m': 5339...|                null|   null| false|
|  6|       1|{'oseast1m': 5170...|                null|   null| false|
|  7|       5|{'oseast1m': 5215...|                null|   null| false|
|  8|       6|{'oseast1m': 5367...|                null|   null| false|
|  9|       7|{'oseast1m': 5260...|                null|   null| false|
|  0|  196537|{'oseast1m': 5329...|2021-12-16 03:28:...|   null|

In [99]:
movement_new.write.parquet('spark/movement.parquet', mode='overwrite')

In [9]:
movement = spark.read.parquet('spark/movement.parquet')

In [110]:
movement.where('Booked == False and StartTime is null').show()

+---+--------+--------------------+--------------------+--------------------+------+
| Id|DriverId|            Position|           StartTime|             EndTime|Booked|
+---+--------+--------------------+--------------------+--------------------+------+
|  0|  196537|{'oseast1m': 5472...|2021-12-16 03:58:...|2021-12-16 03:58:...| false|
|  1|   48763|{'oseast1m': 5382...|2021-12-16 03:58:...|2021-12-16 03:58:...| false|
|  2|  335348|{'oseast1m': 5419...|2021-12-16 03:58:...|2021-12-16 03:58:...| false|
|  3|  109440|{'oseast1m': 5315...|2021-12-16 03:58:...|2021-12-16 03:58:...| false|
|  4|  120362|{'oseast1m': 5284...|2021-12-16 03:58:...|2021-12-16 03:58:...| false|
+---+--------+--------------------+--------------------+--------------------+------+



In [168]:
def create_feedback(orders, delta, feedback=None):

    id_start = 0
    
    if movement != None:
        id_start = movement.agg({'id': 'max'}).collect()[0]['max(id)'] + 1

    orders_closed = orders.filter('Closed == True').filter(F.to_utc_timestamp(F.current_timestamp(), tz='+02').cast('long') - F.col('Modified').cast('long') < delta).collect()
    
    rows = []
    for order in orders_closed:
        raitingClient = random.randint(1,6)
        raitingDriver = random.randint(1,6)
        messageClient = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        messageDriver = ''.join(random.choices(string.ascii_letters + string.digits, k=random.randint(5,25)))
        row = (id_start, order['DriverId'], order['Id'], order['ClientId'], raitingClient, raitingDriver, messageDriver, messageClient)
        rows.append(row)
        id_start += 1
    
    df = spark.createDataFrame(rows, schema='Id bigint, DriverId bigint, OrderId bigint, ClientId bigint, RaitingClient int, RaitingDriver int, MessageDriver string, MessageClient string')
    
    return df

In [172]:
feedback = create_feedback(new_orders, 4000)

In [173]:
feedback.write.parquet('spark/feedback.parquet')

In [10]:
feedback = spark.read.parquet('spark/feedback.parquet')

In [24]:
feedback.select('DriverId', F.col('RaitingDriver').alias('Raiting')).groupBy('DriverId').agg({'Raiting': 'avg'}).show()

+--------+------------+
|DriverId|avg(Raiting)|
+--------+------------+
|  335348|         2.0|
|   48763|         2.0|
|  109440|         3.0|
|  120362|         1.0|
|  196537|         3.0|
+--------+------------+

