# Time Table Building

## I. Libraries to use

In [1]:
import os
os.environ['JAVA_HOME'] = '/usr/java/jdk1.8.0_162'
os.environ['SPARK_HOME'] = '/opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/spark'
import findspark
findspark.init()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import HiveContext
import pandasql
import pandas as pd
from pyspark.sql.functions import monotonically_increasing_id 
from pyspark.sql.functions import arrays_zip,explode,explode_outer, col, split, regexp_extract, array_contains, regexp_replace,concat_ws, create_map, create_map, lit
import pyspark.sql.functions as f
from pyspark.sql.functions import size
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import split
from pyspark.sql import functions as F
from pyspark.sql.functions import first
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
from pyspark.sql.types import StringType ,BooleanType
from pyspark.sql.functions import map_keys,map_values
from pyspark.sql.functions import when, avg, col
from pyspark.sql import Window
from datetime import datetime
import hashlib

## II. Configuracion de Spark

In [2]:
conf = SparkConf().setAppName('dm_time')  \
    .setMaster('yarn').set("spark.yarn.queue","root.eda") \
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .set("spark.executor.instances", "4") \
    .set("spark.executor.cores", "5") \
    .set("spark.driver.memory", "8g") \
    .set("spark.executor.memory", "8g") \
    .set("spark.sql.tungsten.enabled", "true") \
    .set("spark.io.compression.codec", "snappy") \
    .set("spark.sql.crossJoin.enabled", "true") \
    .set("spark.kryoserializer.buffer.mb","128") \
    .set("spark.sql.autoBroadcastJoinThreshold", -1) \
    .set("spark.sql.shuffle.partition","2001") \
    .set("spark.shuffle.compress","true") \
    .set("spark.shuffle.spill.compress","true" ) \
    .set("spark.jars", "/home/raw_rci/jars/kudu-spark-tools-1.4.0.jar")
spark = SparkSession.builder.config(conf=conf).getOrCreate()
sc = spark.sparkContext
sqlContext = HiveContext(sc)

## III. Procesamiento

### 1. Extracción de tabla de tiempo temporal

In [3]:
df_dmtimehv = spark.sql("""
    SELECT *
    FROM rci_network_db.dm_tiempo_hv
    ORDER BY date
""").cache()
df_read = df_dmtimehv.count()

In [4]:
df_dmtimehv.show(truncate=False)

+----------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|date      |year|quarter|month|week_of_month|week_of_year|day|day_of_week|day_of_week_s|day_of_year|
+----------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|2019-01-01|2019|1      |1    |1            |1           |1  |2          |Tue          |1          |
|2019-01-02|2019|1      |1    |1            |1           |2  |3          |Wed          |2          |
|2019-01-03|2019|1      |1    |1            |1           |3  |4          |Thu          |3          |
|2019-01-04|2019|1      |1    |1            |1           |4  |5          |Fri          |4          |
|2019-01-05|2019|1      |1    |1            |1           |5  |6          |Sat          |5          |
|2019-01-06|2019|1      |1    |2            |2           |6  |7          |Sun          |6          |
|2019-01-07|2019|1      |1    |2            |2           |7  |1          |Mon          |7  

### 2. Limpieza del campo de date

In [5]:
df_dmtimehv3 = df_dmtimehv.select(regexp_replace(col("date"), "-", "").alias("date_id"),"year", "quarter", "month", "week_of_month", "week_of_year", "day", "day_of_week", "day_of_week_s", "day_of_year").cache()

In [6]:
df_dmtimehv3.show(truncate=False)

+--------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|date_id |year|quarter|month|week_of_month|week_of_year|day|day_of_week|day_of_week_s|day_of_year|
+--------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|20190101|2019|1      |1    |1            |1           |1  |2          |Tue          |1          |
|20190102|2019|1      |1    |1            |1           |2  |3          |Wed          |2          |
|20190103|2019|1      |1    |1            |1           |3  |4          |Thu          |3          |
|20190104|2019|1      |1    |1            |1           |4  |5          |Fri          |4          |
|20190105|2019|1      |1    |1            |1           |5  |6          |Sat          |5          |
|20190106|2019|1      |1    |2            |2           |6  |7          |Sun          |6          |
|20190107|2019|1      |1    |2            |2           |7  |1          |Mon          |7          |
|20190108|

### 3. Se convierte a PD DataFrame para cambiar los tipos de dato

In [7]:
dfp = df_dmtimehv3.toPandas()

In [8]:
dfp.head()

Unnamed: 0,date_id,year,quarter,month,week_of_month,week_of_year,day,day_of_week,day_of_week_s,day_of_year
0,20190101,2019,1,1,1,1,1,2,Tue,1
1,20190102,2019,1,1,1,1,2,3,Wed,2
2,20190103,2019,1,1,1,1,3,4,Thu,3
3,20190104,2019,1,1,1,1,4,5,Fri,4
4,20190105,2019,1,1,1,1,5,6,Sat,5


In [9]:
dfp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3651 entries, 0 to 3650
Data columns (total 10 columns):
date_id          3651 non-null object
year             3651 non-null int32
quarter          3651 non-null int64
month            3651 non-null int32
week_of_month    3651 non-null object
week_of_year     3651 non-null object
day              3651 non-null int32
day_of_week      3651 non-null object
day_of_week_s    3651 non-null object
day_of_year      3651 non-null object
dtypes: int32(3), int64(1), object(6)
memory usage: 242.5+ KB


In [10]:
dfp['date_id'] = dfp['date_id'].astype(str).astype(int)
dfp['week_of_month'] = dfp['week_of_month'].astype(str).astype(int)
dfp['week_of_year'] = dfp['week_of_year'].astype(str).astype(int)
dfp['day_of_week'] = dfp['day_of_week'].astype(str).astype(int)
dfp['day_of_year'] = dfp['day_of_year'].astype(str).astype(int)

In [11]:
dfp.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3651 entries, 0 to 3650
Data columns (total 10 columns):
date_id          3651 non-null int64
year             3651 non-null int32
quarter          3651 non-null int64
month            3651 non-null int32
week_of_month    3651 non-null int64
week_of_year     3651 non-null int64
day              3651 non-null int32
day_of_week      3651 non-null int64
day_of_week_s    3651 non-null object
day_of_year      3651 non-null int64
dtypes: int32(3), int64(6), object(1)
memory usage: 242.5+ KB


In [12]:
dfp.head()

Unnamed: 0,date_id,year,quarter,month,week_of_month,week_of_year,day,day_of_week,day_of_week_s,day_of_year
0,20190101,2019,1,1,1,1,1,2,Tue,1
1,20190102,2019,1,1,1,1,2,3,Wed,2
2,20190103,2019,1,1,1,1,3,4,Thu,3
3,20190104,2019,1,1,1,1,4,5,Fri,4
4,20190105,2019,1,1,1,1,5,6,Sat,5


### 4. Se convierte a Spark DF

In [13]:
dfp2_sp = spark.createDataFrame(dfp)

In [14]:
dfp2_sp.show(truncate=False)

+--------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|date_id |year|quarter|month|week_of_month|week_of_year|day|day_of_week|day_of_week_s|day_of_year|
+--------+----+-------+-----+-------------+------------+---+-----------+-------------+-----------+
|20190101|2019|1      |1    |1            |1           |1  |2          |Tue          |1          |
|20190102|2019|1      |1    |1            |1           |2  |3          |Wed          |2          |
|20190103|2019|1      |1    |1            |1           |3  |4          |Thu          |3          |
|20190104|2019|1      |1    |1            |1           |4  |5          |Fri          |4          |
|20190105|2019|1      |1    |1            |1           |5  |6          |Sat          |5          |
|20190106|2019|1      |1    |2            |2           |6  |7          |Sun          |6          |
|20190107|2019|1      |1    |2            |2           |7  |1          |Mon          |7          |
|20190108|

### 5. Escritura en tabla

In [None]:
# Landing on Hive
# dfp2_sp.write.format("parquet").mode("overwrite").saveAsTable("rci_network_db.tx_stg_time")

In [17]:
# Landing on Kudu
dfp2_sp.write.format('org.apache.kudu.spark.kudu') \
.options(**{'kudu.master':'mxtold01dlm01.attdatalake.com.mx:7051', 'kudu.table':'impala::rci_network_db.dm_time'}) \
.mode("append").save()

In [None]:
sc.stop()