In [1]:
import configparser
from datetime import datetime
import time
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, lit, \
    concat, lower
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, \
    date_format
from pyspark.sql.types import IntegerType, StringType, TimestampType, \
    StructType, StructField, DoubleType, LongType
from functools import reduce

In [2]:
spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()

In [3]:
get_timestampunix = udf(lambda x:
                            int(time.mktime(
                                datetime.strptime(x[:-3], '%Y-%m-%d %H:%M:%S.%f')\
                                    .timetuple())) if isinstance(x, str) else 0, IntegerType())
get_timestamp = udf(lambda x: int(int(x)/1000), IntegerType())
get_datetime = udf(lambda x: datetime.fromtimestamp(x), TimestampType())

In [4]:
input_data = '../data'
customerlogins_file = os.path.join(input_data, 'customerlogins.csv')
customerregistration_files = os.path.join(input_data, 'customerregistration*.csv')
instantgames_file = os.path.join(input_data, 'instantgamespurchases.csv')
lotterygames_file = os.path.join(input_data, 'lotterygamespurchases.csv')

In [5]:
log_df = spark.read.csv(customerlogins_file, header=True, sep=';')
reg_df = spark.read.csv(customerregistration_files, header=True, sep=';')
lottery_df = spark.read.csv(lotterygames_file, header=True, sep=';')
games_df = spark.read.csv(instantgames_file, header=True, sep=';')

In [6]:
log_df = log_df.withColumnRenamed("timestamp", "datetime")
log_df = log_df.withColumn("timestamp",
                           get_timestampunix(log_df.datetime))

In [7]:
reg_df = reg_df.withColumnRenamed("timestamp", "datetime")
reg_df = reg_df.withColumn("timestamp",
                          get_timestampunix(reg_df.datetime))

In [8]:
lottery_df = lottery_df.where(col("timestampunix").isNotNull())\
                       .withColumn("timestamp",
                                   get_timestamp(lottery_df.timestampunix))
lottery_df = lottery_df.where(col("timestamp").isNotNull())\
                       .withColumn("datetime",
                                   get_datetime(lottery_df.timestamp))
lottery_df = lottery_df.withColumn("amountineur",
                                   col("amountincents") / 100)\
                       .withColumn("feeineur",
                                   col("feeamountincents") / 100)\
                       .withColumn("winningsineur",
                                   lit(None).cast(StringType()))

In [9]:
games_df = games_df.withColumn("betindex", lit(None).cast(StringType()))\
                   .withColumn("discount", lit(None).cast(StringType()))
games_df = games_df.withColumnRenamed("timestamp", "datetime")
games_df = games_df.where(col("datetime").isNotNull())\
                   .withColumn("timestamp",
                              get_timestampunix(games_df.datetime))

In [10]:
time_log_df = log_df.dropDuplicates(["timestamp"])\
                       .select(col("timestamp"),
                               col("datetime"))\
                       .where(col("timestamp").isNotNull())
time_reg_df = reg_df.dropDuplicates(["timestamp"])\
                       .select(col("timestamp"),
                               col("datetime"))\
                       .where(col("timestamp").isNotNull())
time_lottery_df = lottery_df.dropDuplicates(["timestamp"])\
                            .select(col("timestamp"),
                                    col("datetime"))\
                            .where(col("timestamp").isNotNull())
time_games_df = games_df.dropDuplicates(["timestamp"])\
                           .select(col("timestamp"),
                                   col("datetime"))\
                           .where(col("timestamp").isNotNull())
time_table = reduce(lambda x, y: x.union(y), [time_log_df,
                                              time_reg_df,
                                              time_lottery_df,
                                              time_games_df])
time_table = time_table.dropDuplicates(["timestamp"])\
                       .withColumn("hour", hour("datetime"))\
                       .withColumn("day", dayofmonth("datetime"))\
                       .withColumn("week", weekofyear("datetime"))\
                       .withColumn("month", month("datetime"))\
                       .withColumn("year", year("datetime"))\
                       .withColumn("weekday", date_format('datetime', 'E'))\
                       .select(col("timestamp"),
                               col("hour"), col("day"), col("week"),
                               col("month"), col("year"), col("weekday"))

In [11]:
time_table.show(n=5, truncate=False)

+---------+----+---+----+-----+----+-------+
|timestamp|hour|day|week|month|year|weekday|
+---------+----+---+----+-----+----+-------+
|1515291  |12  |18 |3   |1    |1970|Sun    |
|1515605  |13  |18 |3   |1    |1970|Sun    |
|1515726  |13  |18 |3   |1    |1970|Sun    |
|1515788  |13  |18 |3   |1    |1970|Sun    |
|1515969  |13  |18 |3   |1    |1970|Sun    |
+---------+----+---+----+-----+----+-------+
only showing top 5 rows



In [12]:
# product_table.count()