In [1]:
from azure.storage.blob import BlockBlobService
from pyspark            import SparkConf,    SparkContext
from pyspark.sql        import SparkSession, SQLContext
from pyspark.sql        import functions as f
from pyspark.sql.types  import *
import doctest

storageAccountName = "dvbatch"
storageKey         = "d/xdWGVlp4DYi7JCvjEuYW/OaeSBEupMpG/5SlyE7CheMA0s1rHAByjxQ3zSemgvCI70BcyDDpT5s9K1BVMO3g=="
containerName      = "output"
file               = "fixed-width-1.txt"

# Establish connection with the blob storage account
blobService = BlockBlobService(account_name=storageAccountName,
                               account_key =storageKey
                               )

In [2]:
# Create spark session
spark = SparkSession.builder.master("local").appName("fixed-width"                          )\
                                            .config("spark.some.config.option", "some-value")\
                                            .getOrCreate()

In [3]:
def remove_header(_file = file):
    '''
    Removes header from .txt file by filtering out the first row & returning
    a new dataframe without header.
    Parameters
    ----------
    _file : str
        Path to the .txt file we'll be reading
    >>> remove_header(file)
    +----------------------------------------------------+
    |value                                               |
    +----------------------------------------------------+
    | 16524  01  3930621977  TXNPUES                     |
    |191675  01  2368183100  OUNHQEX                     |
    |191667  01  3714468136  GHAKASC                     |
    |191673  01  2632703881  PAHFSAP                     |
    | 80495  01  2766389794  XDZANTV                     |
    +----------------------------------------------------+
    only showing top 5 rows
    <BLANKLINE>
    DataFrame[value: string]
    ''' 
    # Read in fixed-width text file into DataFrame
    df     = spark.read.text(_file)

    # Remove header
    header = df.first()[0]
    df     = df.filter(~f.col("value").contains(header))
    df.show(5,False)
    
    return df
    
df = remove_header(file)

+----------------------------------------------------+
|value                                               |
+----------------------------------------------------+
| 16524  01  3930621977  TXNPUES                     |
|191675  01  2368183100  OUNHQEX                     |
|191667  01  3714468136  GHAKASC                     |
|191673  01  2632703881  PAHFSAP                     |
| 80495  01  2766389794  XDZANTV                     |
+----------------------------------------------------+
only showing top 5 rows



In [4]:
doctest.run_docstring_examples(remove_header, globals(), verbose=True)

Finding tests in NoName
Trying:
    remove_header(file)
Expecting:
    +----------------------------------------------------+
    |value                                               |
    +----------------------------------------------------+
    | 16524  01  3930621977  TXNPUES                     |
    |191675  01  2368183100  OUNHQEX                     |
    |191667  01  3714468136  GHAKASC                     |
    |191673  01  2632703881  PAHFSAP                     |
    | 80495  01  2766389794  XDZANTV                     |
    +----------------------------------------------------+
    only showing top 5 rows
    <BLANKLINE>
    DataFrame[value: string]
ok


In [5]:
def sort_df(_df = df):
    '''
    Take the fixed width file and split into distinct columns
    Parameters
    ----------
    _file : str
        Path to the .txt file we'll be reading
    >>> sort_df(df)
    +------+---+-----------+-----------+
    | Entry|Per|    Account|Description|
    +------+---+-----------+-----------+
    | 16524| 01| 3930621977| TXNPUES   |
    |191675| 01| 2368183100| OUNHQEX   |
    |191667| 01| 3714468136| GHAKASC   |
    |191673| 01| 2632703881| PAHFSAP   |
    | 80495| 01| 2766389794| XDZANTV   |
    | 80507| 01| 4609266335| BWWYEZL   |
    | 80509| 01| 1092717420| QJYPKVO   |
    | 80497| 01| 3386366766| SOQLCMU   |
    |191669| 01| 5905893739| FYIWNKA   |
    |191671| 01| 2749355876|   CBMJTLP |
    +------+---+-----------+-----------+
    <BLANKLINE>
    root
     |-- Entry: string (nullable = true)
     |-- Per: string (nullable = true)
     |-- Account: string (nullable = true)
     |-- Description: string (nullable = true)
    <BLANKLINE>
    DataFrame[Entry: string, Per: string, Account: string, Description: string]
    '''
    _sorted_df = _df.select(
        _df.value.substr( 1,  6).alias('Entry'      ),
        _df.value.substr( 8,  3).alias('Per'        ),
        _df.value.substr(12, 11).alias('Account'    ),
        _df.value.substr(24, 11).alias('Description'),
    )
    _sorted_df.show()
    _sorted_df.printSchema()
    
    return _sorted_df

sorted_df = sort_df(df)

+------+---+-----------+-----------+
| Entry|Per|    Account|Description|
+------+---+-----------+-----------+
| 16524| 01| 3930621977| TXNPUES   |
|191675| 01| 2368183100| OUNHQEX   |
|191667| 01| 3714468136| GHAKASC   |
|191673| 01| 2632703881| PAHFSAP   |
| 80495| 01| 2766389794| XDZANTV   |
| 80507| 01| 4609266335| BWWYEZL   |
| 80509| 01| 1092717420| QJYPKVO   |
| 80497| 01| 3386366766| SOQLCMU   |
|191669| 01| 5905893739| FYIWNKA   |
|191671| 01| 2749355876|   CBMJTLP |
+------+---+-----------+-----------+

root
 |-- Entry: string (nullable = true)
 |-- Per: string (nullable = true)
 |-- Account: string (nullable = true)
 |-- Description: string (nullable = true)



In [6]:
doctest.run_docstring_examples(sort_df, globals(), verbose=True)

Finding tests in NoName
Trying:
    sort_df(df)
Expecting:
    +------+---+-----------+-----------+
    | Entry|Per|    Account|Description|
    +------+---+-----------+-----------+
    | 16524| 01| 3930621977| TXNPUES   |
    |191675| 01| 2368183100| OUNHQEX   |
    |191667| 01| 3714468136| GHAKASC   |
    |191673| 01| 2632703881| PAHFSAP   |
    | 80495| 01| 2766389794| XDZANTV   |
    | 80507| 01| 4609266335| BWWYEZL   |
    | 80509| 01| 1092717420| QJYPKVO   |
    | 80497| 01| 3386366766| SOQLCMU   |
    |191669| 01| 5905893739| FYIWNKA   |
    |191671| 01| 2749355876|   CBMJTLP |
    +------+---+-----------+-----------+
    <BLANKLINE>
    root
     |-- Entry: string (nullable = true)
     |-- Per: string (nullable = true)
     |-- Account: string (nullable = true)
     |-- Description: string (nullable = true)
    <BLANKLINE>
    DataFrame[Entry: string, Per: string, Account: string, Description: string]
ok


In [7]:
def remove_spaces(_sorted_df = sorted_df):
    '''
    Remove leading spaces from all columns in dataframe
    Parameters
    ----------
    _sorted_df : dataframe
        Dataframe to be manipulated
    >>> remove_spaces(sorted_df)
    +------+---+----------+-----------+
    | Entry|Per|   Account|Description|
    +------+---+----------+-----------+
    | 16524| 01|3930621977|    TXNPUES|
    |191675| 01|2368183100|    OUNHQEX|
    |191667| 01|3714468136|    GHAKASC|
    |191673| 01|2632703881|    PAHFSAP|
    | 80495| 01|2766389794|    XDZANTV|
    | 80507| 01|4609266335|    BWWYEZL|
    | 80509| 01|1092717420|    QJYPKVO|
    | 80497| 01|3386366766|    SOQLCMU|
    |191669| 01|5905893739|    FYIWNKA|
    |191671| 01|2749355876|    CBMJTLP|
    +------+---+----------+-----------+
    <BLANKLINE>
    DataFrame[Entry: string, Per: string, Account: string, Description: string]
    '''
    for colname in _sorted_df.columns:
        _sorted_df = _sorted_df.withColumn(colname, f.trim(f.col(colname)))
        
    # Verify that removal of leading & trailing spaces were removed
    _sorted_df.show()
    
    return _sorted_df
    
sorted_df = remove_spaces(sorted_df)

+------+---+----------+-----------+
| Entry|Per|   Account|Description|
+------+---+----------+-----------+
| 16524| 01|3930621977|    TXNPUES|
|191675| 01|2368183100|    OUNHQEX|
|191667| 01|3714468136|    GHAKASC|
|191673| 01|2632703881|    PAHFSAP|
| 80495| 01|2766389794|    XDZANTV|
| 80507| 01|4609266335|    BWWYEZL|
| 80509| 01|1092717420|    QJYPKVO|
| 80497| 01|3386366766|    SOQLCMU|
|191669| 01|5905893739|    FYIWNKA|
|191671| 01|2749355876|    CBMJTLP|
+------+---+----------+-----------+



In [8]:
doctest.run_docstring_examples(remove_spaces, globals(), verbose=True)

Finding tests in NoName
Trying:
    remove_spaces(sorted_df)
Expecting:
    +------+---+----------+-----------+
    | Entry|Per|   Account|Description|
    +------+---+----------+-----------+
    | 16524| 01|3930621977|    TXNPUES|
    |191675| 01|2368183100|    OUNHQEX|
    |191667| 01|3714468136|    GHAKASC|
    |191673| 01|2632703881|    PAHFSAP|
    | 80495| 01|2766389794|    XDZANTV|
    | 80507| 01|4609266335|    BWWYEZL|
    | 80509| 01|1092717420|    QJYPKVO|
    | 80497| 01|3386366766|    SOQLCMU|
    |191669| 01|5905893739|    FYIWNKA|
    |191671| 01|2749355876|    CBMJTLP|
    +------+---+----------+-----------+
    <BLANKLINE>
    DataFrame[Entry: string, Per: string, Account: string, Description: string]
ok


In [9]:
def cast_columns(_sorted_df = sorted_df):
    '''
    Cast each column to specified type
    Parameters
    ----------
    _sorted_df : dataframe
        Dataframe to be manipulated
    >>> cast_columns(sorted_df)
    +------+---+----------+-----------+
    | Entry|Per|   Account|Description|
    +------+---+----------+-----------+
    | 16524|  1|3930621977|    TXNPUES|
    |191675|  1|2368183100|    OUNHQEX|
    |191667|  1|3714468136|    GHAKASC|
    |191673|  1|2632703881|    PAHFSAP|
    | 80495|  1|2766389794|    XDZANTV|
    | 80507|  1|4609266335|    BWWYEZL|
    | 80509|  1|1092717420|    QJYPKVO|
    | 80497|  1|3386366766|    SOQLCMU|
    |191669|  1|5905893739|    FYIWNKA|
    |191671|  1|2749355876|    CBMJTLP|
    +------+---+----------+-----------+
    <BLANKLINE>
    root
     |-- Entry: long (nullable = true)
     |-- Per: integer (nullable = true)
     |-- Account: long (nullable = true)
     |-- Description: string (nullable = true)
    <BLANKLINE>
    DataFrame[Entry: bigint, Per: int, Account: bigint, Description: string]
    '''
    cast = [f.col('Entry')      .cast('long'  ),
            f.col('Per'  )      .cast('int'   ),
            f.col('Account')    .cast('long'  ),
            f.col('Description').cast('string')
           ]              

    _sorted_df = _sorted_df.select(cast)
    _sorted_df.show()
    _sorted_df.printSchema()
    
    return _sorted_df

sorted_df = cast_columns(sorted_df)

+------+---+----------+-----------+
| Entry|Per|   Account|Description|
+------+---+----------+-----------+
| 16524|  1|3930621977|    TXNPUES|
|191675|  1|2368183100|    OUNHQEX|
|191667|  1|3714468136|    GHAKASC|
|191673|  1|2632703881|    PAHFSAP|
| 80495|  1|2766389794|    XDZANTV|
| 80507|  1|4609266335|    BWWYEZL|
| 80509|  1|1092717420|    QJYPKVO|
| 80497|  1|3386366766|    SOQLCMU|
|191669|  1|5905893739|    FYIWNKA|
|191671|  1|2749355876|    CBMJTLP|
+------+---+----------+-----------+

root
 |-- Entry: long (nullable = true)
 |-- Per: integer (nullable = true)
 |-- Account: long (nullable = true)
 |-- Description: string (nullable = true)



In [10]:
doctest.run_docstring_examples(cast_columns, globals(), verbose=True)

Finding tests in NoName
Trying:
    cast_columns(sorted_df)
Expecting:
    +------+---+----------+-----------+
    | Entry|Per|   Account|Description|
    +------+---+----------+-----------+
    | 16524|  1|3930621977|    TXNPUES|
    |191675|  1|2368183100|    OUNHQEX|
    |191667|  1|3714468136|    GHAKASC|
    |191673|  1|2632703881|    PAHFSAP|
    | 80495|  1|2766389794|    XDZANTV|
    | 80507|  1|4609266335|    BWWYEZL|
    | 80509|  1|1092717420|    QJYPKVO|
    | 80497|  1|3386366766|    SOQLCMU|
    |191669|  1|5905893739|    FYIWNKA|
    |191671|  1|2749355876|    CBMJTLP|
    +------+---+----------+-----------+
    <BLANKLINE>
    root
     |-- Entry: long (nullable = true)
     |-- Per: integer (nullable = true)
     |-- Account: long (nullable = true)
     |-- Description: string (nullable = true)
    <BLANKLINE>
    DataFrame[Entry: bigint, Per: int, Account: bigint, Description: string]
ok


In [11]:

def createTimeStamp(): 
    """
    Time stamp to file path to prevent saving over orignial file.
    """
    from datetime import datetime

    # datetime object containing current date and time
    now = datetime.now()

    # /dd-mm-YY_H:M
    dt_string = now.strftime("/%d-%m-%Y_%H-%M")    

    return dt_string

In [12]:
# Create an output folder with a timestamp to prevent overwriting files
output_dir   = "output" + createTimeStamp() 
print(output_dir)

output/15-05-2020_17-16


In [13]:
# Make directory and write files to it
import os
from   os import path

try:
    sorted_df.write.parquet(output_dir)
    files_in_dir = output_dir +"/*"
    
except FileExistsError:
    print("Path exists -- skipping")
    print(output_dir)
    pass

In [14]:
# Print files we just saved with Spark
import glob
print(glob.glob(files_in_dir))

['output/15-05-2020_17-16/part-00000-607454d5-5b50-469e-98c3-1a895cf59c19-c000.snappy.parquet', 'output/15-05-2020_17-16/_SUCCESS']


In [15]:
# Write/upload files to blob storage
for file in glob.glob(files_in_dir):
    print(file)
    blobService.create_blob_from_path(containerName, file, file)

output/15-05-2020_17-16/part-00000-607454d5-5b50-469e-98c3-1a895cf59c19-c000.snappy.parquet
output/15-05-2020_17-16/_SUCCESS
