# Import HDFSUtils

In [None]:
import sys  
sys.path.insert(0, '/var/sds/homes/XP96619/workspace/utils/')

In [None]:
from HDFSUtils import HDFSUtils

### Get the list of content that is within a given path

In [None]:
path = "/data/raw/pext/data/t_pext_rcc_balance/"
content = HDFSUtils(sc).get_content(path)

[file.getPath().toString() for file in content]

['hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/_SUCCESS',
 'hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180531',
 'hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180630',
 'hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180731']

### Get only the list of folders

In [None]:
path = "/data/raw/pext/data/t_pext_rcc_balance/"

HDFSUtils(sc).get_folders(path)

['hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180531',
 'hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180630',
 'hdfs://pedaaswork.scmx2p100.isi/data/raw/pext/data/t_pext_rcc_balance/cutoff_date=20180731']

### Getting Partitions from a Parquet Source

In [None]:
# all partitions
path = "/data/master/pdco/data/retailBusinessBanking/t_pdco_credit_card_mov/"
HDFSUtils(sc).get_date_partitions(path)

['2020-01-31',
 '2020-01-30',
 '2020-01-29',
 '2020-01-28',
 '2020-01-27',
 '2020-01-21',
 '2019-09-30',
 '2019-09-20',
 '2019-07-31',
 '2019-06-28',
 '2019-05-27',
 '2019-05-26',
 '2019-05-24',
 '2019-05-23',
 '2019-04-30',
 '2019-03-31',
 '2019-02-28',
 '2018-12-13',
 '2018-10-31',
 '2018-08-14',
 '2018-06-08',
 '2018-05-27',
 '2017-09-30',
 '2017-09-29',
 '2017-08-31',
 '2017-07-31',
 '2017-06-30',
 '2017-05-31',
 '2017-04-28',
 '2017-03-31',
 '2017-02-28',
 '2017-01-31',
 '2016-12-31']

In [None]:
# We filter with partition_number to bring a desired group:

path = "/data/master/pdco/data/retailBusinessBanking/t_pdco_credit_card_mov/"
HDFSUtils(sc).get_date_partitions(path, partition_number = 3)

['2020-01-31', '2020-01-30', '2020-01-29']

In [None]:
#  We filter with a desired range, within our process_date, 
#  we put the minor date in the first element then in the second the major date.

path = "/data/master/pdco/data/retailBusinessBanking/t_pdco_credit_card_mov/"
HDFSUtils(sc).get_date_partitions(path, process_date = ["2018-08-14", "2020-01-21"])

['2020-01-21',
 '2019-09-30',
 '2019-09-20',
 '2019-07-31',
 '2019-06-28',
 '2019-05-27',
 '2019-05-26',
 '2019-05-24',
 '2019-05-23',
 '2019-04-30',
 '2019-03-31',
 '2019-02-28',
 '2018-12-13',
 '2018-10-31',
 '2018-08-14']

In [None]:
# We filter with a cut-off date

path = "/data/master/pdco/data/retailBusinessBanking/t_pdco_credit_card_mov/"
HDFSUtils(sc).get_date_partitions(path, process_date = "2018-08-14")

['2018-08-14',
 '2018-06-08',
 '2018-05-27',
 '2017-09-30',
 '2017-09-29',
 '2017-08-31',
 '2017-07-31',
 '2017-06-30',
 '2017-05-31',
 '2017-04-28',
 '2017-03-31',
 '2017-02-28',
 '2017-01-31',
 '2016-12-31']

In [None]:
# We filter with a cut-off date plus an operation

path = "/data/master/pdco/data/retailBusinessBanking/t_pdco_credit_card_mov/"
HDFSUtils(sc).get_date_partitions(path, process_date = "2018-08-14", operation="<")

['2020-01-31',
 '2020-01-30',
 '2020-01-29',
 '2020-01-28',
 '2020-01-27',
 '2020-01-21',
 '2019-09-30',
 '2019-09-20',
 '2019-07-31',
 '2019-06-28',
 '2019-05-27',
 '2019-05-26',
 '2019-05-24',
 '2019-05-23',
 '2019-04-30',
 '2019-03-31',
 '2019-02-28',
 '2018-12-13',
 '2018-10-31']

In [None]:
# Get partitions from an avro source, for this we pass the date_format = '%Y%m%d'

path = "/data/raw/pext/data/t_pext_rcc_balance"
HDFSUtils(sc, date_format = '%Y%m%d').get_date_partitions(path)

['20180731', '20180630', '20180531']

# Import HDFSUtils

In [None]:
from DataFrameUtils import DataFrameUtils

### Reading without parameters.

In [None]:
# Read .parquet file path without specifying partition

path = "/data/master/pctk/data/t_pctk_rcc_balance"

DataFrameUtils(spark, sc=sc).read_dataframe(path).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|cutoff_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...| 2020-07-31|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...| 2020-07-31|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------

In [None]:
# Read several .parquet routes by specifying partition

paths = ["/data/master/pctk/data/t_pctk_rcc_balance/cutoff_date=2020-06-30", "/data/master/pctk/data/t_pctk_rcc_balance/cutoff_date=2020-07-31"]

DataFrameUtils(spark, sc=sc).read_dataframe(paths=paths).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
only showing top 2 rows



In [None]:
# Read various .parquet paths by specifying partition and retrieving the partition name

paths = ["/data/master/pctk/data/t_pctk_rcc_balance/cutoff_date=2020-06-30", "/data/master/pctk/data/t_pctk_rcc_balance/cutoff_date=2020-07-31"]

DataFrameUtils(spark, sc=sc).read_dataframe(paths=paths, options = {'basePath': "/data/master/pctk/data/t_pctk_rcc_balance/"}).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|cutoff_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...| 2020-07-31|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...| 2020-07-31|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------

In [None]:
# Read .avro file path without specifying partition

path = "/data/raw/pext/data/t_pext_rcc_balance/"

DataFrameUtils(spark, sc=sc, date_format = '%Y%m%d').read_dataframe(path).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+------------------+----------------+-----------------+-----------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|    balance_amount|credit_risk_type|audtiminsert_date|cutoff_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+------------------+----------------+-----------------+-----------+
|                 2|     0095266282|        00238|             10|    14110206020000|             0000|000000000000192030|               0|       1588985191|   20180531|
|                 2|     0095266282|        00238|             10|    84141000000000|             0000|000000000000192030|               0|       1588985191|   20180531|
+------------------+---------------+-------------+---------------+------------------+-----------------+------------------+----------------+-----------

In [None]:
# Read .txt file path

path = "/in/staging/ratransmit/external/unsubs_20161031.txt"

DataFrameUtils(spark, sc=sc).read_dataframe(path, options={"delimiter":"|", "header": True}).show(2)

+-------------+------------+-----------+------+--------------------+------------+------------------------+------------------+
|SubscriberKey|EmailAddress|FLG_CLIENTE|SendID|           EventDate|   EventType|TriggeredSendExternalKey|       UnsubReason|
+-------------+------------+-----------+------+--------------------+------------+------------------------+------------------+
|    105zbcvi9|        null|          1|146808|2018-09-04 10:17:...|Unsubscribed|                    null|enlace_baja_footer|
|    11j50rzzz|        null|          1|146808|2018-09-04 10:17:...|Unsubscribed|                    null|enlace_baja_footer|
+-------------+------------+-----------+------+--------------------+------------+------------------------+------------------+
only showing top 2 rows



In [None]:
# Read .csv file path

path = "/in/staging/ratransmit/external/users_20180627.csv"

DataFrameUtils(spark, sc=sc).read_dataframe(path, options={"header":True}).show(2)

+-------+----------+---------+
|user_id|first_name|last_name|
+-------+----------+---------+
|    001|       leo|    lopez|
|    001|       leo|  juanelo|
+-------+----------+---------+
only showing top 2 rows



In [None]:
# Read .dat file path

path = "/in/staging/ratransmit/external/v_pdco_monthly_transactional_rcd_20190128.dat"

DataFrameUtils(spark, sc=sc).read_dataframe(path, options={"delimiter":"|"}).show(2)

+----+---+----+---+--------+------------------+----+----+----------+---+---------------+----+-----+-----+----+--------------------+----+----+----+----+----+
| _c0|_c1| _c2|_c3|     _c4|               _c5| _c6| _c7|       _c8|_c9|           _c10|_c11| _c12| _c13|_c14|                _c15|_c16|_c17|_c18|_c19|_c20|
+----+---+----+---+--------+------------------+----+----+----------+---+---------------+----+-----+-----+----+--------------------+----+----+----+----+----+
|0011|009|0383|  3|21773408|001107855001558510|0050|0001|0001-01-01|  0|141903020206061| PEN| 3.50| 3.50|   R|BAZO FLORES CAROL...|  06|  02|  99|   ?|   ?|
|0011|009|0272|  3|26877532|001102725000188850|0050|0001|0001-01-01|  0|141903020106021| PEN|16.65|16.65|   R|CRUZ MEDINA CLAUD...|  06|  02|  99|   ?|   ?|
+----+---+----+---+--------+------------------+----+----+----------+---+---------------+----+-----+-----+----+--------------------+----+----+----+----+----+
only showing top 2 rows



In [None]:
# Read .ctl file path

path = "/in/staging/ratransmit/external/kexc/controlFeedBack_JOURNEY_20200512.ctl"

DataFrameUtils(spark, sc=sc).read_dataframe(path).show(2)

+------+
|   _c0|
+------+
|121193|
+------+



In [None]:
# Read route without partition

path = "/data/master/pdco/data/cross/v_pdco_geo_location_catalog/"

DataFrameUtils(spark, sc=sc).read_dataframe(path).show(2)

+---------+----------+---------------------------+-----------------------------+----------------------------+----------------------+----------------------+----------------+----------+--------------------+-----------+--------------------+
|entity_id|country_id|inei_address_geolocation_id|reniec_address_geolocation_id|sunat_address_geolocation_id|address_geolocation_id|geolocation_group_desc|geolocation_type|zipcode_id|address_zone_type_id|cutoff_date|   audtiminsert_date|
+---------+----------+---------------------------+-----------------------------+----------------------------+----------------------+----------------------+----------------+----------+--------------------+-----------+--------------------+
|     0011|        PE|                     040115|                         null|                        null|               0601015|              QUEQUE�A|               1|         @|                   P| 2017-09-30|2018-09-07 20:55:...|
|     0011|        PE|                     04040

### Reading with parameters:

In [None]:
# read parquet route with last partition

path = "/data/master/pctk/data/t_pctk_rcc_balance"

DataFrameUtils(spark, sc=sc).read_dataframes(path, partition_number = 1).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+
only showing top 2 rows



In [None]:
# read parquet route with the last three partitions retaining the partition name

path = "/data/master/pctk/data/t_pctk_rcc_balance"

DataFrameUtils(spark, sc=sc).read_dataframes(path, partition_number = 3, options = {"basePath": path}).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|cutoff_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...| 2020-07-31|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...| 2020-07-31|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------

In [None]:
# read parquet route from a date range

path = "/data/master/pctk/data/t_pctk_rcc_balance"

DataFrameUtils(spark, sc=sc).read_dataframes(path, process_date=["2020-05-31", "2020-07-31"], options = {"basePath": path}).show(2)

+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|register_type_type|sbs_customer_id|sbs_entity_id|sbs_credit_type|product_definer_id|delay_days_number|balance_amount|credit_risk_type|   audtiminsert_date|cutoff_date|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------+-----------+
|                 2|     0198590495|        00006|             09|    14111302000000|                0|  39201.770000|               0|2020-06-01 02:08:...| 2020-07-31|
|                 2|     0198105163|        00109|             09|    14181300000000|                0|    651.230000|               0|2020-06-01 02:08:...| 2020-07-31|
+------------------+---------------+-------------+---------------+------------------+-----------------+--------------+----------------+--------------------