In [1]:
import pandas as pd 
import numpy as np
import sqlalchemy
import pyodbc

import findspark
findspark.init()
findspark.find()

from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.window as w
import pyspark.sql.types as t
import pyspark.pandas as ps



In [2]:
spark = SparkSession.builder.appName('TheVoice-ETL').getOrCreate()

# Part 1: MRR - Load all the data from the data files into df's

In [5]:
# Load into dataframes 
call_type_mrr =         spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\call_type.parquet')
countries_mrr =         spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\countries.parquet')
customer_mrr =          spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\customer.parquet')
customer_invoice_mrr =  spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\customer_invoice.parquet')
customer_lines_mrr =    spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\customer_lines.parquet')
opfileopp_mrr =         spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\pfileopp.parquet')
package_catalog_mrr =   spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\package_catalog.parquet')
usage_main_mrr =        spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\usage_main.parquet')
xxCountryType_mrr =     spark.read.parquet(r'C:\Users\alex\Desktop\_desktop\TheVoice - PySpark\TheVoice - parquet_files\xxCountryType.parquet')

# Part 2: Staging - Perform the necessary transformations according to the S2T

In [6]:
# DimCallTypes staging

# variable set
price_per_minute = 0.5

# window for the key column
windowSpec_callType = w.Window.orderBy('call_type_code')

# final df 
DimCallTypes_stg = call_type_mrr.withColumn('KeyCallType', f.row_number().over(windowSpec_callType) + 1000)\
    .withColumnRenamed('call_type_code', 'DescCallTypeCode')\
    .withColumnRenamed('call_type_desc', 'DescCallType')\
    .withColumn('DescFullCallType', f.concat_ws('-', f.col('DescCallTypeCode'), f.col('DescCallType')))\
    .withColumn('DescCallTypePriceCategory', f.when(f.col('priceperminuter') > price_per_minute, 'Discounted Price').otherwise('Normal Price'))\
    .withColumnRenamed('call_type', 'DescCallTypeCategory')\
    .select(
        'KeyCallType', 
        'DescCallTypeCode', 
        'DescCallType', 
        'DescFullCallType', 
        'DescCallTypePriceCategory', 
        'DescCallTypeCategory'
    )#.show()
DimCallTypes_stg.show(5)

+-----------+----------------+------------------+--------------------+-------------------------+--------------------+
|KeyCallType|DescCallTypeCode|      DescCallType|    DescFullCallType|DescCallTypePriceCategory|DescCallTypeCategory|
+-----------+----------------+------------------+--------------------+-------------------------+--------------------+
|       1001|              3W|    Three-way call|   3W-Three-way call|         Discounted Price|               Voice|
|       1002|              AC|Audio Conferencing|AC-Audio Conferen...|             Normal Price|               Voice|
|       1003|            CALL|     Cellular Call|  CALL-Cellular Call|         Discounted Price|               Voice|
|       1004|              CF|   Call Forwarding|  CF-Call Forwarding|         Discounted Price|               Voice|
|       1005|              CW|      Call Waiting|     CW-Call Waiting|         Discounted Price|               Voice|
+-----------+----------------+------------------+-------

In [7]:
# DimCountries staging

# select proper column and rename the country code column
xxCountryType_mrr = xxCountryType_mrr.select('COUNTRY_CODE', 'COUNTRY_PRE').withColumnRenamed('COUNTRY_CODE','COUNTRY_CODE2')

# final df
DimCountries_stg = countries_mrr.select('COUNTRY_CODE', 'REGION', 'AREA').join(
    xxCountryType_mrr,
    countries_mrr['COUNTRY_CODE'] == xxCountryType_mrr['COUNTRY_CODE2'],
    how='inner')\
    .withColumnRenamed('COUNTRY_PRE', 'KeyCountry')\
    .withColumnRenamed('COUNTRY_CODE', 'DescCountry')\
    .withColumnRenamed('REGION', 'DescRegion')\
    .withColumnRenamed('AREA', 'DescArea')\
    .select('KeyCountry', 'DescCountry', 'DescRegion', 'DescArea')#.show(5)

DimCountries_stg.show(5)

+----------+-----------+----------+-----------------+
|KeyCountry|DescCountry|DescRegion|         DescArea|
+----------+-----------+----------+-----------------+
|        93|Afghanistan|      Asia|       South Asia|
|       355|    Albania|    Europe|South East Europe|
|       213|    Algeria|    Africa|  Northern Africa|
|       376|    Andorra|    Europe|South West Europe|
|       244|     Angola|    Africa|  Southern Africa|
+----------+-----------+----------+-----------------+
only showing top 5 rows



In [8]:
# DimPackageCatalog staging
DimPackageCatalog_stg = package_catalog_mrr.withColumnRenamed('PACKAGE_NUM','KeyPackage')\
    .withColumnRenamed('pack_desc', 'DescPackage')\
    .withColumnRenamed('createdate', 'DatePackageCreation')\
    .withColumnRenamed('enddate', 'DatePackageEnd')\
    .withColumn('DescPackageStatus', f.when(f.col('status') == 1, 'Active').otherwise('Inactive'))\
    .withColumn('CodePackageActivitiesDays', f.date_diff(f.col('DatePackageEnd'), f.col('DatePackageCreation')))

DimPackageCatalog_stg.show(5)

+----------+--------------------+--------------+------+---------+--------------------+--------------------+--------------------+-----------------+-------------------------+
|KeyPackage| DatePackageCreation|DatePackageEnd|status|pack_type|         DescPackage|         insert_date|         update_date|DescPackageStatus|CodePackageActivitiesDays|
+----------+--------------------+--------------+------+---------+--------------------+--------------------+--------------------+-----------------+-------------------------+
|         1|2014-01-15 16:28:...|          NULL|     1|   FAMILY|pay much more get...|2014-01-15 16:28:...|2014-01-15 16:28:...|           Active|                     NULL|
|         2|2014-01-15 16:28:...|          NULL|     1|  FRIENDS|friends pay more ...|2014-01-15 16:28:...|2014-01-15 16:28:...|           Active|                     NULL|
|         3|2014-01-15 16:28:...|          NULL|     1| BUSINESS|business man pay ...|2014-01-15 16:28:...|2014-01-15 16:28:...|       

In [9]:
# DimOperators staging
DimOperators_stg = opfileopp_mrr.withColumnRenamed('OPCCC', 'KeyOperator')\
    .withColumn('DescOperator', f.concat_ws('-', f.col('prepre').cast('string'), f.col('OPDDD').cast('string')))\
    .withColumnRenamed('prepre', 'DescKeyPrefix')\
    .select('KeyOperator', 'DescOperator', 'DescKeyPrefix')

DimOperators_stg.show(5)

+-----------+-------------+-------------+
|KeyOperator| DescOperator|DescKeyPrefix|
+-----------+-------------+-------------+
|         50|    50-פלאפון|           50|
|         52|     52-סלקום|           52|
|         53|53-הוט מובייל|           53|
|         54|     54-אורנג|           54|
|         58|58-גולן טלקום|           58|
+-----------+-------------+-------------+



In [10]:
# DimCustomers staging
DimCustomers_stg = customer_mrr.join(customer_lines_mrr.select('PHONE_NO', 'DESC'), 
        customer_mrr['CUST_NUMBER'] == customer_lines_mrr['PHONE_NO'], how='inner')\
    .withColumnRenamed('customer_id', 'KeyCustomer')\
    .withColumnRenamed('DESC', 'DescCusomterPackage')\
    .withColumn( 'CustomerLineOperatorPrefixForJoin', f.when(f.length(f.col('PHONE_NO').cast('string')) == 12, f.substr(f.col('PHONE_NO').cast('string'), f.lit(4), f.lit(2)))\
        .otherwise(f.substr(f.col('PHONE_NO').cast('string'), f.lit(2), f.lit(3))))\
    .withColumn('CustomercountryPrefixForJoin', f.when(f.length(f.col('PHONE_NO').cast('string')) == 12,f.substr(f.col('PHONE_NO').cast('string'), f.lit(1), f.lit(3)))\
        .otherwise(f.substr(f.col('PHONE_NO').cast('string'), f.lit(1), f.lit(1))))\
    .withColumnRenamed('cust_name', 'DescCustomerName')\
    .withColumnRenamed('address', 'DescCustomerAddress')\
    .join(opfileopp_mrr.select('OPCCC', 'OPDDD'), f.col('CustomerLineOperatorPrefixForJoin').cast('int') == f.col('OPCCC').cast('int'), how='left')\
    .withColumn('operatorName', f.when(f.isnull(f.col('OPDDD')), 'Unknown').otherwise(f.col('OPDDD')))\
    .withColumnRenamed('operatorName', 'DescCustomerLineOperator')\
    .join(xxCountryType_mrr, f.col('CustomerCountryPrefixForJoin') == f.col('COUNTRY_PRE'), how='left')\
    .withColumn('DescCustomerLineCountry', f.when(f.isnull('COUNTRY_CODE2'), 'Unknown').otherwise(f.col('COUNTRY_CODE2')))\
    .select(
        'KeyCustomer', 
        'DescCustomerLineOperator', 
        'DescCustomerLineCountry', 
        'DescCustomerName', 
        'DescCustomerAddress', 
        'DescCusomterPackage'
    )

DimCustomers_stg.show(5)

+-----------+------------------------+-----------------------+-----------------+-------------------+-------------------+
|KeyCustomer|DescCustomerLineOperator|DescCustomerLineCountry| DescCustomerName|DescCustomerAddress|DescCusomterPackage|
+-----------+------------------------+-----------------------+-----------------+-------------------+-------------------+
|          1|                   אורנג|                 Israel|     Eugene Huang|         2243 W St.|    FAMILY packages|
|          2|                   אורנג|                 Israel|     Ruben Torres|   5844 Linden Land|    FAMILY packages|
|          3|                   אורנג|                 Israel|      Christy Zhu|   1825 Village Pl.|    FAMILY packages|
|          4|                   אורנג|                 Israel|Elizabeth Johnson|7553 Harness Circle|    FAMILY packages|
|          5|                   אורנג|                 Israel|       Julio Ruiz|7305 Humphrey Drive|    FAMILY packages|
+-----------+-------------------

In [11]:
# DimCallOriginType staging
DimCallOriginType_stg = usage_main_mrr.withColumn('DescCallOriginType',
    f.when(f.col('CELL_ORIGIN') == 1, 'Cellular Call')\
    .when(f.col('CELL_ORIGIN') == 0 , 'Line Call').otherwise('Unknown'))\
    .withColumnRenamed('CELL_ORIGIN', 'KeyCallOriginType')\
    .select('KeyCallOriginType', 'DescCallOriginType')

DimCallOriginType_stg.show(5)

+-----------------+------------------+
|KeyCallOriginType|DescCallOriginType|
+-----------------+------------------+
|                1|     Cellular Call|
|                0|         Line Call|
|                0|         Line Call|
|                1|     Cellular Call|
|                0|         Line Call|
+-----------------+------------------+
only showing top 5 rows



In [12]:
# DimDate staging
start_date = usage_main_mrr.select(f.min(f.col('CALL_DATETIME'))).first()[0]
end_date = usage_main_mrr.select(f.max(f.col('CALL_DATETIME'))).first()[0]

DimDate_pandas = pd.DataFrame({'FullDate': pd.date_range(start= start_date, end= end_date)})
DimDate_stg = spark.createDataFrame(DimDate_pandas)

DimDate_stg = DimDate_stg.withColumn('month_string',
    f.when(f.month(f.col('FullDate')) < 10, f.concat(f.lit('0'), f.month(f.col('FullDate')))).otherwise(f.month(f.col('FullDate'))))\
    .withColumn('day_string', f.when(f.day(f.col('FullDate')) < 10, f.concat(f.lit('0'), f.day(f.col('FullDate')))).otherwise(f.day(f.col('FullDate'))))\
    .withColumn('KeyDate', f.concat(f.year(f.col('FullDate')), f.col('month_string'), f.col('day_string')))\
    .withColumn('KeyYear',f.year(f.col('FullDate')))\
    .withColumn('KeyMonth', f.concat(f.year('FullDate'), f.col('month_string')))\
    .withColumn('CodeMonth', f.month(f.col('FullDate')))\
    .withColumn('DescMonth', f.date_format(f.col('FullDate'), 'MMMM'))\
    .withColumn('CodeDayInWeek', f.day(f.col('FullDate')))\
    .withColumn('DescDayInWeek', f.date_format(f.col('FullDate'), 'EEEE'))\
    .select(
        'KeyDate', 
        'FullDate', 
        'KeyYear', 
        'KeyMonth', 
        'CodeMonth', 
        'DescMonth', 
        'CodeDayInWeek', 
        'DescDayInWeek'
    )#.show(5)

DimDate_stg.show(5)

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+--------+-------------------+-------+--------+---------+---------+-------------+-------------+
| KeyDate|           FullDate|KeyYear|KeyMonth|CodeMonth|DescMonth|CodeDayInWeek|DescDayInWeek|
+--------+-------------------+-------+--------+---------+---------+-------------+-------------+
|20150908|2015-09-08 00:00:00|   2015|  201509|        9|September|            8|      Tuesday|
|20150909|2015-09-09 00:00:00|   2015|  201509|        9|September|            9|    Wednesday|
|20150910|2015-09-10 00:00:00|   2015|  201509|        9|September|           10|     Thursday|
|20150911|2015-09-11 00:00:00|   2015|  201509|        9|September|           11|       Friday|
|20150912|2015-09-12 00:00:00|   2015|  201509|        9|September|           12|     Saturday|
+--------+-------------------+-------+--------+---------+---------+-------------+-------------+
only showing top 5 rows



In [13]:
# FactUsage staging
FactUsage_stg = usage_main_mrr.join(customer_mrr.select(f.col('customer_id')),
    f.col('CALL_NO') == f.col('customer_id'), how='inner')\
    .withColumnRenamed('CALL_NO', 'CallId')\
    .withColumnRenamed('CUST_ID', 'KeyCustomer')\
    .join(DimCallTypes_stg.select('KeyCallType', 'DescCallTypeCode'),
        f.col('CALL_TYPE') == f.col('DescCallTypeCode'), how='inner')\
    .withColumn('CALLING_NO_operator',
        f.when(f.length(f.col('CALLING_NO')) == 12, f.substr(f.col('CALLING_NO'), f.lit(4), f.lit(2))).otherwise(f.substr(f.col('CALLING_NO'), f.lit(2), f.lit(3))))\
    .join(opfileopp_mrr.select('OPCCC', 'OPDDD'),f.col('CALLING_NO_operator') == f.col('OPCCC'), how='left')\
    .withColumn('KeyOriginOperator',f.when(f.isnull(f.col('OPDDD')), f.lit(-1)).otherwise(f.col('OPDDD')))\
    .withColumn('CALLING_NO_country',f.when(f.length(f.col('CALLING_NO')) == 12, f.substr(f.col('CALLING_NO'), f.lit(1), f.lit(3))).otherwise(f.substr(f.col('CALLING_NO'), f.lit(1), f.lit(1))))\
    .join(xxCountryType_mrr, f.col('CALLING_NO_country') == f.col('COUNTRY_PRE'), how='left')\
    .withColumn('KeyOriginCountry',f.when(f.isnull(f.col('COUNTRY_CODE2')), f.lit(-1)).otherwise(f.col('COUNTRY_CODE2')))\
    .withColumn('DES_NO_operator',f.when(f.length(f.col('DES_NO')) == 12, f.substr(f.col('DES_NO'), f.lit(4), f.lit(2))).otherwise(f.substr(f.col('DES_NO'), f.lit(2), f.lit(3))))\
    .join(opfileopp_mrr.select(f.col('OPCCC').alias('OPCCC_DES_NO'), f.col('OPDDD').alias('OPDDD_DES_NO')),f.col('DES_NO_operator') == f.col('OPCCC_DES_NO'), how='left')\
    .withColumn('KeyDestinationOperator', f.when(f.isnull(f.col('OPDDD_DES_NO')), f.lit(-1)).otherwise(f.col('OPDDD_DES_NO')))\
    .join(customer_lines_mrr, f.col('CALLING_NO') == f.col('PHONE_NO'), how='inner')\
    .join(DimPackageCatalog_stg, f.col('TYPE') == f.col('pack_type'), how='inner')\
    .withColumnRenamed('CELL_ORIGIN', 'KeyCallOriginType')\
    .withColumn('month_string',  f.when(f.month(f.col('CALL_DATETIME')) < 10, f.concat(f.lit('0'), f.month(f.col('CALL_DATETIME')))).otherwise(f.month(f.col('CALL_DATETIME'))))\
    .withColumn('day_string', f.when(f.day(f.col('CALL_DATETIME')) < 10, f.concat(f.lit('0'), f.day(f.col('CALL_DATETIME')))).otherwise(f.day(f.col('CALL_DATETIME'))))\
    .withColumn('KeyCallDate', f.concat(f.year(f.col('CALL_DATETIME')), f.col('month_string'), f.col('day_string')))\
    .withColumnRenamed('DURATION', 'Duration')\
    .withColumn('BillableDuration', f.col('Duration') - f.col('numberoffreeminutes'))\
    .withColumnRenamed('RATED_AMNT', 'Amount')\
    .withColumn('BillableAmount', f.when( (f.col('discountpct') == 0) | (f.isnull(f.col('discountpct'))), f.col('Amount')).otherwise(f.col('Amount')*f.col('discountpct')/(100)))\
    .select(
        'CallId', 
        'KeyCustomer', 
        'KeyCallType', 
        'KeyOriginOperator', 
        'KeyOriginCountry', 
        'KeyDestinationOperator', 
        'KeyPackage',
        'KeyCallDate',
        'Duration',
        'BillableDuration',
        'Amount',
        'BillableAmount'
        )#.show(5)

FactUsage_stg.show(5)

+------+-----------+-----------+-----------------+----------------+----------------------+----------+-----------+--------+----------------+------+--------------+
|CallId|KeyCustomer|KeyCallType|KeyOriginOperator|KeyOriginCountry|KeyDestinationOperator|KeyPackage|KeyCallDate|Duration|BillableDuration|Amount|BillableAmount|
+------+-----------+-----------+-----------------+----------------+----------------------+----------+-----------+--------+----------------+------+--------------+
|     1|       8544|       1036|            סלקום|          Israel|                    -1|         1|   20190905|       4|             3.0|     0|           0.0|
|     2|      10205|       1033|               -1|   United States|                    -1|         1|   20190905|       0|             0.0|     0|           0.0|
|     3|      18147|       1036|               -1|   United States|                    -1|         3|   20190905|       1|             1.0|     0|           0.0|
|     4|       9327|       1

# Part 3: DWH - create incremetal load queries to load the transformed data

In [14]:
# creating false target df's for i=0 (keep schema)
DimCallTypes = DimCallTypes_stg.filter(f.lit(2) == f.lit(1))
DimCountries = DimCountries_stg.filter(f.lit(2) == f.lit(1))
DimPackageCatalog = DimPackageCatalog_stg.filter(f.lit(2) == f.lit(1))
DimOperators = DimOperators_stg.filter(f.lit(2) == f.lit(1))
DimCustomers = DimCustomers_stg.filter(f.lit(2) == f.lit(1))
DimCallOriginType = DimCallOriginType_stg.filter(f.lit(2) == f.lit(1))
DimDate = DimDate_stg.filter(f.lit(2) == f.lit(1))
FactUsage = FactUsage_stg.filter(f.lit(2) == f.lit(1))

In [15]:
# DimCallType incremental load
DimCallTypes.printSchema()

# get all new records
DimCallTypes_insert = DimCallTypes_stg.alias('src').join(DimCallTypes.alias('trg'), f.col('src.KeyCallType') == f.col('trg.KeyCallType'), how='leftanti')\
    .select(
        f.col('src.KeyCallType'),
        f.col('src.DescCallTypeCode'),
        f.col('src.DescCallType'),
        f.col('src.DescFullCallType'),
        f.col('src.DescCallTypePriceCategory'),
        f.col('src.DescCallTypeCategory')
    )

# get all records to update
DimCallTypes_updated = DimCallTypes_stg.alias('src').join(DimCallTypes.alias('trg'), f.col('src.KeyCallType') == f.col('trg.KeyCallType'), how='inner')\
    .select(
        f.col('src.KeyCallType'),
        f.col('src.DescCallTypeCode'),
        f.col('src.DescCallType'),
        f.col('src.DescFullCallType'),
        f.col('src.DescCallTypePriceCategory'),
        f.col('src.DescCallTypeCategory')
    )

# add all the new records
DimCallTypes = DimCallTypes.union(DimCallTypes_insert).union(DimCallTypes_updated).distinct()
DimCallTypes.show(5)

root
 |-- KeyCallType: integer (nullable = false)
 |-- DescCallTypeCode: string (nullable = true)
 |-- DescCallType: string (nullable = true)
 |-- DescFullCallType: string (nullable = false)
 |-- DescCallTypePriceCategory: string (nullable = false)
 |-- DescCallTypeCategory: string (nullable = true)

+-----------+----------------+------------------+--------------------+-------------------------+--------------------+
|KeyCallType|DescCallTypeCode|      DescCallType|    DescFullCallType|DescCallTypePriceCategory|DescCallTypeCategory|
+-----------+----------------+------------------+--------------------+-------------------------+--------------------+
|       1001|              3W|    Three-way call|   3W-Three-way call|         Discounted Price|               Voice|
|       1002|              AC|Audio Conferencing|AC-Audio Conferen...|             Normal Price|               Voice|
|       1003|            CALL|     Cellular Call|  CALL-Cellular Call|         Discounted Price|            

In [16]:
# DimCountries incremental load
DimCountries.printSchema()

# get all new records
DimCountries_insert = DimCountries_stg.alias('src').join(DimCountries.alias('trg'), f.col('src.KeyCountry') == f.col('trg.KeyCountry'), how='leftanti')\
    .select(
        f.col('src.KeyCountry'),
        f.col('src.DescCountry'),
        f.col('src.DescRegion'),
        f.col('src.DescArea')
    )#.show()

# get all updated records
DimCountries_updated = DimCountries_stg.alias('src').join(DimCountries.alias('trg'), f.col('src.KeyCountry') == f.col('trg.KeyCountry'), how='inner')\
    .select(
        f.col('src.KeyCountry'),
        f.col('src.DescCountry'),
        f.col('src.DescRegion'),
        f.col('src.DescArea')
    )

# add all new recoreds
DimCountries = DimCountries.union(DimCountries_insert).union(DimCountries_updated).distinct()
DimCountries.show(5)

root
 |-- KeyCountry: long (nullable = true)
 |-- DescCountry: string (nullable = true)
 |-- DescRegion: string (nullable = true)
 |-- DescArea: string (nullable = true)



+----------+-----------+----------+---------------+
|KeyCountry|DescCountry|DescRegion|       DescArea|
+----------+-----------+----------+---------------+
|        66|   Thailand|      Asia|South East Asia|
|       252|    Somalia|    Africa| Eastern Africa|
|       994| Azerbaijan|      Asia|South West Asia|
|       237|   Cameroon|    Africa| Western Africa|
|       961|    Lebanon|      Asia|South West Asia|
+----------+-----------+----------+---------------+
only showing top 5 rows



In [17]:
# DimPackageCatalog incremental load
DimPackageCatalog.printSchema()

# gel all new records
DimPackageCatalog_insert = DimPackageCatalog_stg.alias('src').join(DimPackageCatalog.alias('trg'), f.col('src.KeyPackage') == f.col('trg.KeyPackage'), how='leftanti')\
    .select(
        f.col('src.KeyPackage'),
        f.col('src.DatePackageCreation'),
        f.col('src.DatePackageEnd'),
        f.col('src.status'),
        f.col('src.pack_type'),
        f.col('src.DescPackage'),
        f.col('src.insert_date'),
        f.col('src.update_date'),
        f.col('src.DescPackageStatus'),
        f.col('src.CodePackageActivitiesDays')
    )#.show(5)

# get all updated records
DimPackageCatalog_update = DimPackageCatalog_stg.alias('src').join(DimPackageCatalog.alias('trg'), f.col('src.KeyPackage') == f.col('trg.KeyPackage'), how='inner')\
    .select(
        f.col('src.KeyPackage'),
        f.col('src.DatePackageCreation'),
        f.col('src.DatePackageEnd'),
        f.col('src.status'),
        f.col('src.pack_type'),
        f.col('src.DescPackage'),
        f.col('src.insert_date'),
        f.col('src.update_date'),
        f.col('src.DescPackageStatus'),
        f.col('src.CodePackageActivitiesDays')
    )#.show(5)

# add all new records
DimPackageCatalog = DimPackageCatalog.union(DimPackageCatalog_insert).union(DimPackageCatalog_update).distinct()
DimPackageCatalog.show(5)

root
 |-- KeyPackage: long (nullable = true)
 |-- DatePackageCreation: string (nullable = true)
 |-- DatePackageEnd: string (nullable = true)
 |-- status: long (nullable = true)
 |-- pack_type: string (nullable = true)
 |-- DescPackage: string (nullable = true)
 |-- insert_date: string (nullable = true)
 |-- update_date: string (nullable = true)
 |-- DescPackageStatus: string (nullable = false)
 |-- CodePackageActivitiesDays: integer (nullable = true)

+----------+--------------------+--------------+------+---------+--------------------+--------------------+--------------------+-----------------+-------------------------+
|KeyPackage| DatePackageCreation|DatePackageEnd|status|pack_type|         DescPackage|         insert_date|         update_date|DescPackageStatus|CodePackageActivitiesDays|
+----------+--------------------+--------------+------+---------+--------------------+--------------------+--------------------+-----------------+-------------------------+
|         4|2012-01-22 0

In [18]:
# DimOperators incremental load
DimOperators.printSchema()

# get all new records
DimOperators_insert = DimOperators_stg.alias('src').join(DimOperators.alias('trg'), f.col('src.KeyOperator') == f.col('trg.KeyOperator'), how='leftanti')\
    .select(
        f.col('src.KeyOperator'),
        f.col('src.DescOperator'),
        f.col('src.DescKeyPrefix')
    )#.show(5)

# get all updated records
DimOperators_updated = DimOperators_stg.alias('src').join(DimOperators.alias('trg'), f.col('src.KeyOperator') == f.col('trg.KeyOperator'), how='inner')\
    .select(
        f.col('src.KeyOperator'),
        f.col('src.DescOperator'),
        f.col('src.DescKeyPrefix')
    )#.show(5)

# add all records
DimOperators = DimOperators.union(DimOperators_insert).union(DimOperators_updated).distinct()
DimOperators.show(5)

root
 |-- KeyOperator: long (nullable = true)
 |-- DescOperator: string (nullable = false)
 |-- DescKeyPrefix: long (nullable = true)

+-----------+-------------+-------------+
|KeyOperator| DescOperator|DescKeyPrefix|
+-----------+-------------+-------------+
|         54|     54-אורנג|           54|
|         52|     52-סלקום|           52|
|         50|    50-פלאפון|           50|
|         58|58-גולן טלקום|           58|
|         53|53-הוט מובייל|           53|
+-----------+-------------+-------------+



In [19]:
# DimCustomers incremental load
DimCustomers.printSchema()

# get all new records
DimCustomers_insert = DimCustomers_stg.alias('src').join(DimCustomers.alias('trg'), f.col('src.KeyCustomer') == f.col('trg.KeyCustomer'), how='leftanti')\
    .select(
        f.col('src.KeyCustomer'),
        f.col('src.DescCustomerLineOperator'),
        f.col('src.DescCustomerLineCountry'),
        f.col('src.DescCustomerName'),
        f.col('src.DescCustomerAddress'),
        f.col('src.DescCusomterPackage')
    )#.show(5)

# get all updated records
DimCustomers_updated = DimCustomers_stg.alias('src').join(DimCustomers.alias('trg'), f.col('src.KeyCustomer') == f.col('trg.KeyCustomer'), how='inner')\
    .select(
        f.col('src.KeyCustomer'),
        f.col('src.DescCustomerLineOperator'),
        f.col('src.DescCustomerLineCountry'),
        f.col('src.DescCustomerName'),
        f.col('src.DescCustomerAddress'),
        f.col('src.DescCusomterPackage')
    )#.show(5)

DimCustomers = DimCustomers.union(DimCustomers_insert).union(DimCustomers_updated).distinct()
DimCustomers.show(5)

root
 |-- KeyCustomer: long (nullable = true)
 |-- DescCustomerLineOperator: string (nullable = true)
 |-- DescCustomerLineCountry: string (nullable = true)
 |-- DescCustomerName: string (nullable = true)
 |-- DescCustomerAddress: string (nullable = true)
 |-- DescCusomterPackage: string (nullable = true)

+-----------+------------------------+-----------------------+----------------+--------------------+-------------------+
|KeyCustomer|DescCustomerLineOperator|DescCustomerLineCountry|DescCustomerName| DescCustomerAddress|DescCusomterPackage|
+-----------+------------------------+-----------------------+----------------+--------------------+-------------------+
|        525|                   אורנג|                 Israel|   Latasha Munoz|    4799 Buena Vista|    FAMILY packages|
|        759|                   אורנג|                 Israel|Kimberly Stewart| 6289 Via Del Verdes|    FAMILY packages|
|        764|                   אורנג|                 Israel|    Christy Zhou|      75

In [20]:
# DimCallOriginType incremental load
DimCallOriginType.printSchema()

# get all new records
DimCallOriginType_insert = DimCallOriginType_stg.alias('src').join(DimCallOriginType.alias('trg'), f.col('src.KeyCallOriginType') == f.col('trg.KeyCallOriginType'), how='leftanti')\
    .select(
        f.col('src.KeyCallOriginType'),
        f.col('src.DescCallOriginType')
    )#.show(5)

# get all updated records
DimCallOriginType_updated = DimCallOriginType_stg.alias('src').join(DimCallOriginType.alias('trg'), f.col('src.KeyCallOriginType') == f.col('trg.KeyCallOriginType'), how='inner')\
    .select(
        f.col('src.KeyCallOriginType'),
        f.col('src.DescCallOriginType')
    )#.show(5)

# add all new records
DimCallOriginType = DimCallOriginType.union(DimCallOriginType_insert).union(DimCallOriginType_updated).distinct()
DimCallOriginType.show(5)

root
 |-- KeyCallOriginType: long (nullable = true)
 |-- DescCallOriginType: string (nullable = false)

+-----------------+------------------+
|KeyCallOriginType|DescCallOriginType|
+-----------------+------------------+
|                1|     Cellular Call|
|                0|         Line Call|
+-----------------+------------------+



In [21]:
# DimDate incremental load
DimDate.printSchema()

# get all new records
DimDate_insert = DimDate_stg.alias('src').join(DimDate.alias('trg'), f.col('src.KeyDate') == f.col('trg.KeyDate'), how='leftanti')\
    .select(
        f.col('src.KeyDate'),
        f.col('src.FullDate'),
        f.col('src.KeyYear'),
        f.col('src.KeyMonth'),
        f.col('src.CodeMonth'),
        f.col('src.DescMonth'),
        f.col('src.CodeDayInWeek'),
        f.col('src.DescDayInWeek'),
    )#.show(5)

# get all updated records
DimDate_updated = DimDate_stg.alias('src').join(DimDate.alias('trg'), f.col('src.KeyDate') == f.col('trg.KeyDate'), how='inner')\
    .select(
        f.col('src.KeyDate'),
        f.col('src.FullDate'),
        f.col('src.KeyYear'),
        f.col('src.KeyMonth'),
        f.col('src.CodeMonth'),
        f.col('src.DescMonth'),
        f.col('src.CodeDayInWeek'),
        f.col('src.DescDayInWeek'),
    )#.show(5)

# add all records
DimDate = DimDate.union(DimDate_insert).union(DimDate_updated).distinct()
DimDate.show(5)

root
 |-- KeyDate: string (nullable = true)
 |-- FullDate: timestamp (nullable = true)
 |-- KeyYear: integer (nullable = true)
 |-- KeyMonth: string (nullable = true)
 |-- CodeMonth: integer (nullable = true)
 |-- DescMonth: string (nullable = true)
 |-- CodeDayInWeek: integer (nullable = true)
 |-- DescDayInWeek: string (nullable = true)

+--------+-------------------+-------+--------+---------+---------+-------------+-------------+
| KeyDate|           FullDate|KeyYear|KeyMonth|CodeMonth|DescMonth|CodeDayInWeek|DescDayInWeek|
+--------+-------------------+-------+--------+---------+---------+-------------+-------------+
|20151101|2015-11-01 00:00:00|   2015|  201511|       11| November|            1|       Sunday|
|20150924|2015-09-24 00:00:00|   2015|  201509|        9|September|           24|     Thursday|
|20151206|2015-12-06 00:00:00|   2015|  201512|       12| December|            6|       Sunday|
|20160120|2016-01-20 00:00:00|   2016|  201601|        1|  January|           20| 

In [22]:
# FactUsage incremental load
FactUsage.printSchema()

# get all new records
FactUsage_insert = FactUsage_stg.alias('src').join(FactUsage.alias('trg'), f.col('src.CallId') == f.col('trg.CallId'), how='leftanti')\
    .select(
        f.col('src.CallId'),
        f.col('src.KeyCustomer'),
        f.col('src.KeyCallType'),
        f.col('src.KeyOriginOperator'),
        f.col('src.KeyOriginCountry'),
        f.col('src.KeyDestinationOperator'),
        f.col('src.KeyPackage'),
        f.col('src.KeyCallDate'),
        f.col('src.Duration'),
        f.col('src.BillableDuration'),
        f.col('src.Amount'),
        f.col('src.BillableAmount')
    )#.show(5)

# get all updated records
FactUsage_updated = FactUsage_stg.alias('src').join(FactUsage.alias('trg'), f.col('src.CallId') == f.col('trg.CallId'), how='inner')\
    .select(
        f.col('src.CallId'),
        f.col('src.KeyCustomer'),
        f.col('src.KeyCallType'),
        f.col('src.KeyOriginOperator'),
        f.col('src.KeyOriginCountry'),
        f.col('src.KeyDestinationOperator'),
        f.col('src.KeyPackage'),
        f.col('src.KeyCallDate'),
        f.col('src.Duration'),
        f.col('src.BillableDuration'),
        f.col('src.Amount'),
        f.col('src.BillableAmount')
    )#.show(5)

FactUsage = FactUsage.union(FactUsage_insert).union(FactUsage_updated).distinct()
FactUsage.show(5)

root
 |-- CallId: long (nullable = true)
 |-- KeyCustomer: long (nullable = true)
 |-- KeyCallType: integer (nullable = false)
 |-- KeyOriginOperator: string (nullable = true)
 |-- KeyOriginCountry: string (nullable = true)
 |-- KeyDestinationOperator: string (nullable = true)
 |-- KeyPackage: long (nullable = true)
 |-- KeyCallDate: string (nullable = true)
 |-- Duration: long (nullable = true)
 |-- BillableDuration: double (nullable = true)
 |-- Amount: long (nullable = true)
 |-- BillableAmount: double (nullable = true)

+------+-----------+-----------+-----------------+----------------+----------------------+----------+-----------+--------+----------------+------+--------------+
|CallId|KeyCustomer|KeyCallType|KeyOriginOperator|KeyOriginCountry|KeyDestinationOperator|KeyPackage|KeyCallDate|Duration|BillableDuration|Amount|BillableAmount|
+------+-----------+-----------+-----------------+----------------+----------------------+----------+-----------+--------+----------------+------+