### read_file

In [1]:
from pyspark.sql import SparkSession

In [2]:
def read_file(spark_session: SparkSession, file_name, ptw='../data/tables/', type='parquet', truncate=80, sep=','):
    '''
To read different type of file use spark. And show the first metadata after read. 

Parameters
----------
spark_session : DataFrame
file_name: str
    The full name of the file to read. 
ptw : 
    The relative path of the file to read, default '../data/tables/'
type : {'parquet', 'csv'}, default 'parquet'
truncate : int, default 80
    Parameter of `show` function spark dataframe, which controll the maximum 
    number of characters per row.
sep : str, default ','
    For csv reading, control the seperate character.


Returns
-------
Spark DataFrame
    A DataFrame of the read file.


Examples
--------
>>> sdf = read_file(spark, 'tbl_merchants.parquet')
|> Loading File...
|> Loading Finished!
-RECORD 0----------------------------------------------------------------------------------------
 name         | Felis Limited                                                                    
 tags         | ((furniture, home furnishings and equipment shops, and manufacturers, except ... 
 merchant_abn | 10023283211                                                                      
only showing top 1 row

>>> sdf = read_file(spark, 'tbl_merchants.parquet', truncate=20)
|> Loading File...
|> Loading Finished!
-RECORD 0----------------------------
 name         | Felis Limited        
 tags         | ((furniture, home... 
 merchant_abn | 10023283211          
only showing top 1 row

>>> sdf = read_file(spark, 'tbl_consumer.csv', type='csv', sep='|')
|> Loading File...
|> Loading Finished!
-RECORD 0---------------------------------
 name        | Yolanda Williams           
 address     | 413 Haney Gardens Apt. 742 
 state       | WA                         
 postcode    | 6935                       
 gender      | Female                     
 consumer_id | 1195503                    
only showing top 1 row
    '''

    # read file
    print('|> Loading File...')
    if type == 'csv':
        sdf = spark_session.read.csv(f'{ptw}{file_name}', sep=sep, header=True)

    elif type == 'parquet':
        sdf = spark_session.read.parquet(f'{ptw}{file_name}')
    print('|> Loading Finished!')

    # print the first row of data 
    sdf.show(1, vertical=True, truncate=truncate)
    return sdf

In [3]:
spark = (
    # Create a spark session (which will run spark jobs)
    SparkSession.builder.appName("Project 1")
    .config("spark.sql.repl.eagerEval.enabled", True) 
    .config("spark.sql.parquet.cacheMetadata", "true")
    .config('spark.executor.memory','10g')
    .config('spark.driver.memory','12g')
    .config('spark.driver.maxResultsSize', '10 GiB')
    .config('spark.shuffle.file.buffer', '64k')
    # .config("spark.network.timeout", "3600s")
    # .master("local[6]")
    .getOrCreate()
    )
sdf = read_file(spark, 'tbl_merchants.parquet')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/31 10:56:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
|> Loading File...


                                                                                

|> Loading Finished!
-RECORD 0----------------------------------------------------------------------------------------
 name         | Felis Limited                                                                    
 tags         | ((furniture, home furnishings and equipment shops, and manufacturers, except ... 
 merchant_abn | 10023283211                                                                      
only showing top 1 row



In [4]:
sdf = read_file(spark, 'consumer_user_details.parquet')

|> Loading File...
|> Loading Finished!
-RECORD 0--------------
 user_id     | 1       
 consumer_id | 1195503 
only showing top 1 row



In [5]:
sdf = read_file(spark, 'transactions_20210228_20210827_snapshot')

|> Loading File...


                                                                                

|> Loading Finished!
-RECORD 0----------------------------------------------
 user_id        | 18478                                
 merchant_abn   | 62191208634                          
 dollar_value   | 63.255848959735246                   
 order_id       | 949a63c8-29f7-4ab0-ada4-99ac50a88952 
 order_datetime | 2021-08-20                           
only showing top 1 row



### create_folder

In [6]:
import os

In [7]:
def create_folder(path):
    '''
Create folder.

Parameters
----------
path : str
    The relative path of the new folder. 


Examples
--------
>>> create_folder('../data/temp')
|> Create Successfully!

>>> create_folder('../data/tables/consumer_user_details.parquet')
|> The folder name duplicated with a file!
|> Files already exist under the upper folder:
   ['transactions_20210228_20210827_snapshot', '.DS_Store', '.gitkeep', 'consumer_user_details.parquet', 'tbl_consumer.csv', 'tbl_merchants.parquet']

>>> create_folder('../data/tables')
|> The folder already exist!
|> Files already exist under this folder:
   ['transactions_20210228_20210827_snapshot', '.DS_Store', '.gitkeep', 'consumer_user_details.parquet', 'tbl_consumer.csv', 'tbl_merchants.parquet']
    '''

    # folder should not already exist
    if not os.path.exists(path):
        os.makedirs(path)
        print('|> Create Successfully!')
    
    # if the folder aleady created, the print out the files under this folder
    elif os.path.isdir(path):
        print(f'|> The folder already exist!\n|> Files already exist under this folder:\n   {os.listdir(path)}')
    
    # the name of the new folder is the same as a file already exist under the upper folder
    elif os.path.isfile(path):
        upper_path = '/'.join(path.split('/')[:-1])
        print(f'|> The folder name duplicated with a file!\n|> Files already exist under the upper folder:\n   {os.listdir( upper_path )}')
    return 

In [6]:
create_folder('../data/temp')

|> Create Successfully!


### temp_record_sdf

In [6]:
def temp_record_sdf(sdf:SparkSession, path = '../data/temp', overwrite = False):
    '''
Save current progress for future steps

Parameters
----------
sdf : spark dataframe
path : str
    Path to save data, defualt as `../data/temp`
overwrite : bool
    Set if cover the origin data, defualt False

Examples
--------
>>> temp_record_sdf(sdf, path='../data/temp')
>>> temp_record_sdf(sdf, path='../data/temp')
>>> temp_record_sdf(sdf, path='../data/temp', overwrite=True)
|> Waitting for saving...
|> Save Successfully!
--
|> Waitting for saving...
|> The folder already exist! Change the attr `overwrite` to cover the origin data.
-- 
|> Waitting for saving...
|> Save Successfully!

>>> print(os.listdir( '../data' ))
>>> print(os.path.isfile( '../data/temp.parquet' ))
>>> temp_record_sdf(sdf, path='../data/temp.parquet')
>>> temp_record_sdf(sdf, path='../data/temp.parquet', overwrite=True)
['tables', '.gitkeep', 'README.md', 'temp.parquet', 'curated']
--
True
--
|> The name duplicated with a file!
   Change the name or change the attr `overwrite` to cover the origin data.
--
|> Waitting for saving...
|> Save Successfully!
    '''


    # folder should not already exist
    if not os.path.exists(path):
        print('|> Waitting for saving...')
        sdf.write.parquet(path)
        print('|> Save Successfully!')
    
    # if the folder aleady created, the print out the files under this folder
    elif os.path.isdir(path):
        try:
            print('|> Waitting for saving...')
            if (overwrite):
                sdf.write.partitionBy('order_datetime').parquet(path, mode = 'overwrite')
            else:
                sdf.write.parquet(path)
            print('|> Save Successfully!')
        except Exception:
            print('|> The folder already exist! Change the attr `overwrite` to cover the origin data.')
    
    # the name of the new folder is the same as a file already exist under the upper folder
    elif os.path.isfile(path):
        if (overwrite):
            print('|> Waitting for saving...')
            sdf.write.parquet(path, mode = 'overwrite')
            print('|> Save Successfully!')
        else:
            print(f'|> The name duplicated with a file!\n   Change the name or change the attr `overwrite` to cover the origin data.')

    return 

In [10]:
sdf.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- merchant_abn: long (nullable = true)
 |-- dollar_value: double (nullable = true)
 |-- order_id: string (nullable = true)
 |-- order_datetime: date (nullable = true)



In [12]:
# DataFrame转换成RDD
result = sdf.rdd.map ( lambda p : "user_id: " + str(p.user_id)+ " merchant_abn: " + str(p.merchant_abn) + \
    " dollar_value: " + str(p.dollar_value) + " order_id: " + \
    p.order_id + " order_datetime: " + str(p.order_datetime) ).collect()



                                                                                

In [35]:
temp_record_sdf(sdf, path='../data/temp')
# temp_record_sdf(sdf, path='../data/temp')
# temp_record_sdf(sdf, path='../data/temp', overwrite=True)

|> Waitting for saving...


[Stage 28:>                                                         (0 + 8) / 8]

22/08/31 10:18:41 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
|> Save Successfully!


                                                                                

### temp_record_query

In [9]:
def temp_record_query(sql_query:SparkSession.sql, *cols,\
    path='../data/temp', overwrite = False):
    '''
Save current progress for future steps

Parameters
----------
sql_query : spark sql query
*cols : 'ColumnsOrName'
    Name of columns.
path : str
    Path to save data, defualt as `../data/temp`
overwrite : bool
    Set if cover the origin data, defualt False


Examples
--------
>>> sql_query = sdf.orderBy('merchant_abn')
>>> temp_record_query(sql_query, 'name', 'tags', 'merchant_abn')
|> Waitting for saving...
|> Save Successfully!
    '''
    # convert to spark dataframe and save
    temp_record_sdf(sql_query.toDF(*cols), path=path, overwrite=overwrite)
    return 

In [10]:
sql_query = sdf.orderBy('merchant_abn')
temp_record_query(sql_query, 'name', 'tags', 'merchant_abn', overwrite=True)

|> Waitting for saving...
|> Save Successfully!


### transfor between DataFrame and RDD

In [None]:
from pyspark.sql import Row

# DataFrame to RDD
srdd = sdf.rdd.map ( lambda p: Row(user_id=p.user_id, merchant_abn=p.merchant_abn, \
    dollar_value=p.dollar_value, order_id=p.order_id, order_datetime=p.order_datetime) )

# RDD to DataFrame
sdf = spark.createDataFrame( srdd )