In [2]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import udf, lit
from pyspark.sql.types import *#IntegerType, StringType
import pyspark.sql.functions as F
import json
import itertools
import math
from datetime import datetime, timedelta
import pandas as pd
import copy
import numpy as np
import matplotlib.pyplot as plt
import random
from tqdm import tqdm

In [3]:
# Randomizacja predyktorów
d = {'morning' : 0, 'afternoon' : 1, 'evening': 2, 'night': 3}
inv_d = dict((v, k) for k, v in d.items())

departs = ['Depart_hour1_There', 'Depart_hour2_There', 'Depart_hour1_Back', 'Depart_hour2_Back']
arrives = ['Arrival_hour1_There', 'Arrival_hour2_There', 'Arrival_hour1_Back', 'Arrival_hour2_Back']

In [24]:
@udf
def groupped_days(days, bin_size):
    bin_size = int(bin_size)
    low = days // bin_size * bin_size
    up = low + bin_size - 1
    return(str(low) + '-' + str(up))

def exists(obj, datastructure):
    return any([obj == elem for elem in datastructure])

@udf
def random_num():
    return random.randint(0, 3)


def subtr(depart, arrive):
    global d
    if (depart != 'none'):
        return abs(d[arrive] - d[depart])
    else:
        return 10
    
def update_time_depart():
    global inv_d
    randInt = random.randint(0, 3)
    return inv_d[randInt]

def update_time_arrive(depart, arrive):
    global inv_d
    global subtr
    randInt = random.randint(0, 3)
    sub = subtr(depart, arrive)
    return inv_d[(int(randInt) + int(sub)) % 4]

def days_update(days):
    if days > 90:
        return random.randint(0, 10)
    else:
        return days
    
def update_price(full_price, days, country_from, country_to, scrap_time, bin_size):
    global days_stats
    global bin_stats
    bin_size = int(bin_size)
    records = bin_stats[(bin_stats.Country_from == country_from) & (bin_stats.Country_to == country_to) & 
                       (bin_stats.Scrap_time == int(scrap_time))]
    bucket = (days // bin_size) * bin_size
    array = np.array(records['startingBin'].tolist())
    array = np.asarray(array)
    idx = (np.abs(array - bucket)).argmin()
    bucket = array[idx]
    record = records[records.startingBin == bucket]
    # Tutaj chodzi o to, że moze wystapic jakas unikalna wartosc i wtedy nie bedzie dla niej wariancji, wtedy
    # Ustalam wariancje na 1
    if np.isnan(record['stdev'].item()) == False:
        new_price = np.random.normal(loc = record['mean'], scale = record['stdev'], size = 1)[0]
    else:
        new_price = np.random.normal(loc = record['mean'], scale = 1, size = 1)[0]
    new_price = new_price + np.random.rand()
    return new_price.tolist()

@udf
def myRandom(x):
    return random.randint(0, 10)

In [5]:
conf = SparkConf().setAppName('MyFirstStandaloneApp')
conf.set("spark.speculation","false")
sc = SparkContext(conf=conf)

In [6]:
bin_size = 5

filepath = "Structured_data2"

flightsRDD = sc.textFile(filepath)
header = flightsRDD.first()
flightsRDD = flightsRDD.filter(lambda line : line != header)
colnames = header.split(';')
fields = [StructField(field_name, StringType(), True) for field_name in colnames]
schema = StructType(fields)
parts = flightsRDD.map(lambda line: line.split(';'))
sqlContext = SQLContext(sc)
dt = sqlContext.createDataFrame(parts, schema)
to_cast_int = ["Flight_id", "Days", "Journey_time"]
to_cast_double = ["Price1_There", "Price1_Back", "Price2_There", "Price2_Back", "Full_Price"]
for field in to_cast_int:
    dt = dt.withColumn(field,  dt[field].cast(IntegerType()))
for field in to_cast_double:
    dt = dt.withColumn(field,  dt[field].cast(DoubleType()))
dt.registerTempTable("flights")

df = dt

weekDay =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d').strftime('%w'))

df = dt.withColumn('weekDay', weekDay(dt['Scrap_date']))
df = df.withColumn('grouppedDays', groupped_days(df['Days'], lit(bin_size)))
fun = udf(lambda x : int(x.split('-')[0]))
df = df.withColumn('startingBin', fun(df.grouppedDays))


df1 = df.groupby(['Scrap_time', 'Country_from', 'Country_to', 'startingBin']).agg(
F.mean(df.Full_Price).alias('mean'),
F.stddev(df.Full_Price).alias('stdev'),
F.count(df.Full_Price).alias('count'))

df2 = df.groupby(['Scrap_time', 'Country_from', 'Country_to', 'Days']).agg(
F.mean(df.Full_Price).alias('mean'),
F.stddev(df.Full_Price).alias('stdev'),
F.count(df.Full_Price).alias('count'))

days_stats = df2.toPandas()
bin_stats = df1.toPandas()

asint = ['Scrap_time', 'startingBin', 'count', 'Days']
asfloat = ['mean', 'stdev']
day_col = list(days_stats.columns)
bin_col = list(bin_stats.columns)

for c1, c2 in zip(day_col, bin_col):
    if exists(c1 , day_col):
        if exists(c1, asint):
            days_stats[c1] = days_stats[c1].astype('int')
        elif exists(c1, asfloat):
            days_stats[c1] = days_stats[c1].astype('float')
    if exists(c2 , bin_col):
        if exists(c2, asint):
            bin_stats[c2] = bin_stats[c2].astype('int')
        elif exists(c2, asfloat):
            bin_stats[c2] = bin_stats[c2].astype('float')
            
filepath = "Reference_file"

flightsRDD = sc.textFile(filepath)
header = flightsRDD.first()
flightsRDD = flightsRDD.filter(lambda line : line != header)
colnames = header.split(';')
fields = [StructField(field_name, StringType(), True) for field_name in colnames]
schema = StructType(fields)
parts = flightsRDD.map(lambda line: line.split(';'))
sqlContext = SQLContext(sc)
dt = sqlContext.createDataFrame(parts, schema)
to_cast_int = ["Flight_id", "Days", "Journey_time"]
to_cast_double = ["Price1_There", "Price1_Back", "Price2_There", "Price2_Back", "Full_Price"]
for field in to_cast_int:
    dt = dt.withColumn(field,  dt[field].cast(IntegerType()))
for field in to_cast_double:
    dt = dt.withColumn(field,  dt[field].cast(DoubleType()))
dt.registerTempTable("flights")

datafr = dt.toPandas()

In [9]:
cp = copy.deepcopy(datafr)

In [25]:
datafr = copy.deepcopy(cp)

In [None]:
for i in range(1, 60):
    # zapisuje i wczytuje, zeby odpowiednio inferowal typy
    filename = './Generated/' + str(datafr['Scrap_date'][0]) + '.txt'
    datafr.to_csv(filename, sep = ';', header = True, index= False)
    datafr = pd.read_csv(filename, sep = ';')
    
    datafr['Scrap_date'] = datafr.apply(lambda row : str(datetime.strptime(row['Scrap_date'], '%Y-%m-%d') - 
                                                  timedelta(days = 1)).split(' ')[0], axis = 1)
    datafr['Days'] = datafr.apply(lambda row: (row[5] + 1) if (row[5] + 1 < 90 ) else random.randint(0, 10) , axis = 1)
        
    for depart, arrive in zip(departs, arrives):
        datafr[depart] = datafr.apply(lambda row : update_time_depart() if row[depart] != 'none' else 'none' , axis = 1)
        datafr[arrive] = datafr.apply(lambda row :  update_time_arrive(row[depart], row[arrive]) if row[arrive] != 'none' else
                                      'none' , axis = 1)
    tqdm.pandas()
    datafr['Full_Price'] = datafr.progress_apply(lambda row: update_price(row[-1], row[5], row[2], row[3], row[1], bin_size), axis = 1)
    datafr['Full_Price'] = np.round(datafr['Full_Price'], 2)
    datafr.to_csv(filename, sep = ';', header = True, index = False)

100%|█████████████████████████████████████████████████████████████████████████| 136397/136397 [14:29<00:00, 156.84it/s]
 60%|████████████████████████████████████████████▍                             | 81821/136397 [08:37<06:12, 146.36it/s]