In [298]:
# import necessary libraries
import pandas as pd 
import numpy
import matplotlib.pyplot as plt 
from pyspark.sql import SparkSession# create sparksession
from pyspark.sql import functions as F

spark = SparkSession \
    .builder \
    .appName("RecruitRestaurantVisitorForecasting").config("spark.local.dir","/home/jovyan/work") \
    .getOrCreate()

## Air Reserve Statistics

In [211]:
%%time
df = spark.read.csv("../data/air_reserve.csv.gz", header='true', inferSchema='true')

CPU times: user 10 ms, sys: 0 ns, total: 10 ms
Wall time: 1.34 s


In [219]:
df.show(5)

+--------------------+-------------------+-------------------+----------------+
|        air_store_id|     visit_datetime|   reserve_datetime|reserve_visitors|
+--------------------+-------------------+-------------------+----------------+
|air_877f79706adbfb06|2016-01-01 19:00:00|2016-01-01 16:00:00|               1|
|air_db4b38ebe7a7ceff|2016-01-01 19:00:00|2016-01-01 19:00:00|               3|
|air_db4b38ebe7a7ceff|2016-01-01 19:00:00|2016-01-01 19:00:00|               6|
|air_877f79706adbfb06|2016-01-01 20:00:00|2016-01-01 16:00:00|               2|
|air_db80363d35f10926|2016-01-01 20:00:00|2016-01-01 01:00:00|               5|
+--------------------+-------------------+-------------------+----------------+
only showing top 5 rows



In [213]:
df.printSchema()

root
 |-- air_store_id: string (nullable = true)
 |-- visit_datetime: timestamp (nullable = true)
 |-- reserve_datetime: timestamp (nullable = true)
 |-- reserve_visitors: integer (nullable = true)



In [214]:
df.show(5)

+--------------------+-------------------+-------------------+----------------+
|        air_store_id|     visit_datetime|   reserve_datetime|reserve_visitors|
+--------------------+-------------------+-------------------+----------------+
|air_877f79706adbfb06|2016-01-01 19:00:00|2016-01-01 16:00:00|               1|
|air_db4b38ebe7a7ceff|2016-01-01 19:00:00|2016-01-01 19:00:00|               3|
|air_db4b38ebe7a7ceff|2016-01-01 19:00:00|2016-01-01 19:00:00|               6|
|air_877f79706adbfb06|2016-01-01 20:00:00|2016-01-01 16:00:00|               2|
|air_db80363d35f10926|2016-01-01 20:00:00|2016-01-01 01:00:00|               5|
+--------------------+-------------------+-------------------+----------------+
only showing top 5 rows



In [215]:
df.createOrReplaceTempView("ar")

In [217]:
ar_stat = spark.sql("""
select air_store_id
     , to_date(visit_datetime) as visit_date
     , sum( unix_timestamp(visit_datetime)- unix_timestamp(reserve_datetime)/3600 ) as rs1
     , sum(reserve_visitors) as rv1
     , avg( unix_timestamp(visit_datetime)- unix_timestamp(reserve_datetime)/3600 ) as rs2
     , avg(reserve_visitors) as rv2   
from ar group by 1,2
""")

In [218]:
ar_stat.show(5)

+--------------------+----------+---------------+---+--------------------+------------------+
|        air_store_id|visit_date|            rs1|rv1|                 rs2|               rv2|
+--------------------+----------+---------------+---+--------------------+------------------+
|air_32460819c7600037|2016-01-13|  4.356935221E9| 17|1.4523117403333333E9| 5.666666666666667|
|air_7831b00996701c0f|2016-01-22|  4.359249633E9| 23|       1.453083211E9| 7.666666666666667|
|air_54ed43163b7596c4|2016-01-25|  1.453341077E9|  8|       1.453341077E9|               8.0|
|air_2a3743e37aab04b4|2016-02-05|  4.362866499E9|  8|       1.454288833E9|2.6666666666666665|
|air_8093d0b565e9dbdf|2016-02-17|1.1642632051E10| 19|    1.455329006375E9|             2.375|
+--------------------+----------+---------------+---+--------------------+------------------+
only showing top 5 rows



In [238]:
%%time
hpg = spark.read.csv("../data/hpg_reserve.csv.gz", header='true', inferSchema='true')

CPU times: user 0 ns, sys: 10 ms, total: 10 ms
Wall time: 23.3 s


In [239]:
hpg.printSchema()

root
 |-- hpg_store_id: string (nullable = true)
 |-- visit_datetime: timestamp (nullable = true)
 |-- reserve_datetime: timestamp (nullable = true)
 |-- reserve_visitors: integer (nullable = true)



In [240]:
hpg.show(5)

+--------------------+-------------------+-------------------+----------------+
|        hpg_store_id|     visit_datetime|   reserve_datetime|reserve_visitors|
+--------------------+-------------------+-------------------+----------------+
|hpg_c63f6f42e088e50f|2016-01-01 11:00:00|2016-01-01 09:00:00|               1|
|hpg_dac72789163a3f47|2016-01-01 13:00:00|2016-01-01 06:00:00|               3|
|hpg_c8e24dcf51ca1eb5|2016-01-01 16:00:00|2016-01-01 14:00:00|               2|
|hpg_24bb207e5fd49d4a|2016-01-01 17:00:00|2016-01-01 11:00:00|               5|
|hpg_25291c542ebb3bc2|2016-01-01 17:00:00|2016-01-01 03:00:00|              13|
+--------------------+-------------------+-------------------+----------------+
only showing top 5 rows



In [241]:
link = spark.read.csv("../data/store_id_relation.csv.gz", header='true',inferSchema='true')

In [242]:
link.printSchema()

root
 |-- air_store_id: string (nullable = true)
 |-- hpg_store_id: string (nullable = true)



In [243]:
link.show(5)

+--------------------+--------------------+
|        air_store_id|        hpg_store_id|
+--------------------+--------------------+
|air_63b13c56b7201bd9|hpg_4bc649e72e2a239a|
|air_a24bf50c3e90d583|hpg_c34b496d0305a809|
|air_c7f78b4f3cba33ff|hpg_cd8ae0d9bbd58ff9|
|air_947eb2cae4f3e8f2|hpg_de24ea49dc25d6b8|
|air_965b2e0cf4119003|hpg_653238a84804d8e7|
+--------------------+--------------------+
only showing top 5 rows



In [251]:
hpg = hpg.alias('m').join(link.alias('h'), hpg.hpg_store_id == link.hpg_store_id).select('h.air_store_id','m.*')

In [252]:
hpg.show()

+--------------------+--------------------+-------------------+-------------------+----------------+
|        air_store_id|        hpg_store_id|     visit_datetime|   reserve_datetime|reserve_visitors|
+--------------------+--------------------+-------------------+-------------------+----------------+
|air_db80363d35f10926|hpg_878cc70b1abc76f7|2016-01-01 19:00:00|2016-01-01 15:00:00|               4|
|air_08cb3c4ee6cd6a22|hpg_dc639640420bde5f|2016-01-01 19:00:00|2016-01-01 16:00:00|               2|
|air_6b15edd1b4fbb96a|hpg_babe2c3d962d7bb6|2016-01-02 17:00:00|2016-01-01 22:00:00|               3|
|air_37189c92b6c761ec|hpg_2e10e1956528199a|2016-01-02 18:00:00|2016-01-02 17:00:00|               2|
|air_37189c92b6c761ec|hpg_2e10e1956528199a|2016-01-02 18:00:00|2016-01-01 20:00:00|               2|
|air_37189c92b6c761ec|hpg_2e10e1956528199a|2016-01-02 18:00:00|2016-01-01 21:00:00|               3|
|air_c1ff20617c54fee7|hpg_4be4a5cb851e45af|2016-01-02 18:00:00|2016-01-02 16:00:00|        

In [253]:
hpg.createOrReplaceTempView("hpg")

In [254]:
spark.sql("select * from hpg limit 1").show()

+--------------------+--------------------+-------------------+-------------------+----------------+
|        air_store_id|        hpg_store_id|     visit_datetime|   reserve_datetime|reserve_visitors|
+--------------------+--------------------+-------------------+-------------------+----------------+
|air_db80363d35f10926|hpg_878cc70b1abc76f7|2016-01-01 19:00:00|2016-01-01 15:00:00|               4|
+--------------------+--------------------+-------------------+-------------------+----------------+



In [256]:
hpg_stat = spark.sql("""
select air_store_id
     , to_date(visit_datetime) as visit_date
     , sum( unix_timestamp(visit_datetime)- unix_timestamp(reserve_datetime)/3600 ) as rs1
     , sum(reserve_visitors) as rv1
     , avg( unix_timestamp(visit_datetime)- unix_timestamp(reserve_datetime)/3600 ) as rs2
     , avg(reserve_visitors) as rv2   
from hpg group by 1,2
""")

In [262]:
hpg_stat.cache()
ar_stat.cache()

DataFrame[air_store_id: string, visit_date: date, rs1: double, rv1: bigint, rs2: double, rv2: double]

In [263]:
train = spark.read.csv("../data/air_visit_data.csv", header='true',inferSchema='true')

In [267]:
train.show(3)

+--------------------+-------------------+--------+
|        air_store_id|         visit_date|visitors|
+--------------------+-------------------+--------+
|air_ba937bf13d40fb24|2016-01-13 00:00:00|      25|
|air_ba937bf13d40fb24|2016-01-14 00:00:00|      32|
|air_ba937bf13d40fb24|2016-01-15 00:00:00|      29|
+--------------------+-------------------+--------+
only showing top 3 rows



In [265]:
train.createOrReplaceTempView("train")
hpg_stat.createOrReplaceTempView("hpg_stat")
ar_stat.createOrReplaceTempView("ar_stat")

In [270]:
train = spark.sql("""
select t.*, a.rs1 as rs1_x, a.rv1 as rv1_x, a.rs2 as rs2_x, a.rv2 as rv2_x
, h.rs1 as rs1_y, h.rv1 as rv1_y, h.rs2 as rs2_y, h.rv2 as rv2_y 
FROM train t left join hpg_stat h on (t.air_store_id  = h.air_store_id and t.visit_date = h.visit_date)
LEFT JOIN ar_stat a on (t.air_store_id  = a.air_store_id and t.visit_date = a.visit_date)
""")

In [271]:
train.show(5)

+--------------------+-------------------+--------+-----+-----+-----+-----+-----+-----+-----+-----+
|        air_store_id|         visit_date|visitors|rs1_x|rv1_x|rs2_x|rv2_x|rs1_y|rv1_y|rs2_y|rv2_y|
+--------------------+-------------------+--------+-----+-----+-----+-----+-----+-----+-----+-----+
|air_ba937bf13d40fb24|2016-01-13 00:00:00|      25| null| null| null| null| null| null| null| null|
|air_ba937bf13d40fb24|2016-01-14 00:00:00|      32| null| null| null| null| null| null| null| null|
|air_ba937bf13d40fb24|2016-01-15 00:00:00|      29| null| null| null| null| null| null| null| null|
|air_ba937bf13d40fb24|2016-01-16 00:00:00|      22| null| null| null| null| null| null| null| null|
|air_ba937bf13d40fb24|2016-01-18 00:00:00|       6| null| null| null| null| null| null| null| null|
+--------------------+-------------------+--------+-----+-----+-----+-----+-----+-----+-----+-----+
only showing top 5 rows



In [316]:
train_df = train.toPandas()

train_df.to_csv('../data/to2ml/round2_train.csv.gz',compression='gzip',index=False)

In [272]:
test = spark.read.csv("../data/sample_submission.csv.gz", header='true')

In [299]:
test.show(3,truncate=False)

test.createOrReplaceTempView("test")

+-------------------------------+--------+
|id                             |visitors|
+-------------------------------+--------+
|air_00a91d42b08b08d9_2017-04-23|0       |
|air_00a91d42b08b08d9_2017-04-24|0       |
|air_00a91d42b08b08d9_2017-04-25|0       |
+-------------------------------+--------+
only showing top 3 rows



In [292]:
def rsplit_part(s, sep, part_num):
    return s.rsplit(sep,1)[part_num - 1]

In [295]:
spark.udf.register('rsplit_part',rsplit_part)

In [307]:
spark.sql("select rsplit_part(id,'_',1) as air_store_id,to_date(rsplit_part(id,'_',2)) as visit_date  from test limit 1").show(truncate=False)

+--------------------+----------+
|air_store_id        |visit_date|
+--------------------+----------+
|air_00a91d42b08b08d9|2017-04-23|
+--------------------+----------+



In [308]:
test = spark.sql("""
select t.*, a.rs1 as rs1_x, a.rv1 as rv1_x, a.rs2 as rs2_x, a.rv2 as rv2_x
, h.rs1 as rs1_y, h.rv1 as rv1_y, h.rs2 as rs2_y, h.rv2 as rv2_y 
FROM (select rsplit_part(id,'_',1) as air_store_id,to_date(rsplit_part(id,'_',2)) as visit_date  from test) t left join hpg_stat h on (t.air_store_id  = h.air_store_id and t.visit_date = h.visit_date)
LEFT JOIN ar_stat a on (t.air_store_id  = a.air_store_id and t.visit_date = a.visit_date)
""")

In [315]:
test_df = test.toPandas()

test_df.to_csv('../data/to2ml/round2_test.csv.gz',compression='gzip',index=False)