In [18]:
import sys
from pyspark.sql import functions as F
from pyspark.sql.functions import * 
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from functools import reduce

sc = SparkContext.getOrCreate()
gc = GlueContext(sc)



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# source_df=spark.read.option("header","true").csv("s3://sol-dev-source/TruCape-Invoices")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:

source_dyf = gc.create_dynamic_frame.from_options(
    's3',
    {
        "paths": [
            's3://sol-dev-source/TruCape-Invoices'
        ],
        "recurse" : True
    },
    "csv",
    transformation_ctx = "source_dyf",
    headerText = True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
source_df = source_dyf.toDF()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
source_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- col0: string (nullable = true)
 |-- col1: string (nullable = true)
 |-- col2: string (nullable = true)
 |-- col3: string (nullable = true)
 |-- col4: string (nullable = true)
 |-- col5: string (nullable = true)
 |-- col6: string (nullable = true)
 |-- col7: string (nullable = true)
 |-- col8: string (nullable = true)

In [24]:
# (b) Create array with desired columns
old_columns = source_df.schema.names
new_columns = [
    field.lower().replace("col0", "External Reference")
    .replace("col1", "Number")
    .replace("col2", "Counterparty")
    .replace("col3", "Amount Due")
    .replace("col4", "Currency")
    .replace("col5", "Cost Currency")
    .replace("col6", "Costing Rate")
    .replace("col7", "Due Date")
    .replace("col8", "Issue Date")
    for field in old_columns
]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# (c) Overwrite and persist `new_columns`
source_df = reduce(
    lambda source_df, idx: source_df.withColumnRenamed(old_columns[idx], new_columns[idx]),
    range(len(old_columns)),
    source_df,
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
source_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------+-------------------+------------+--------+-------------+------------+--------+----------+
|external reference|number|       counterparty|  amount due|currency|cost currency|costing rate|due date|Issue Date|
+------------------+------+-------------------+------------+--------+-------------+------------+--------+----------+
|External Reference|Number|       Counterparty| Amount Due |Currency|Cost Currency|Costing Rate|Due Date|Issue Date|
|           SC25648|138099|         JSC TANDER|  21,124.80 |     USD|          ZAR|     15.5654|   7-Apr|    21-Jan|
|           SC25646|138127|HORIZON FRESH FRUIT|  17,100.00 |     USD|          ZAR|     15.5142|  17-Mar|    21-Jan|
|           SC25645|138129|HORIZON FRESH FRUIT|  17,100.00 |     USD|          ZAR|     15.5142|  17-Mar|    21-Jan|
+------------------+------+-------------------+------------+--------+-------------+------------+--------+----------+

In [34]:
pandas_source_df = source_df.toPandas()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
print(source_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[external reference: string, number: string, counterparty: string, amount due: string, currency: string, cost currency: string, costing rate: string, due date: string, Issue Date: string]

In [36]:
source_df = pandas_source_df.iloc[1: , :]

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
source_df=spark.createDataFrame(source_df) 

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
source_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------+-------------------+-----------+--------+-------------+------------+--------+----------+
|external reference|number|       counterparty| amount due|currency|cost currency|costing rate|due date|Issue Date|
+------------------+------+-------------------+-----------+--------+-------------+------------+--------+----------+
|           SC25648|138099|         JSC TANDER| 21,124.80 |     USD|          ZAR|     15.5654|   7-Apr|    21-Jan|
|           SC25646|138127|HORIZON FRESH FRUIT| 17,100.00 |     USD|          ZAR|     15.5142|  17-Mar|    21-Jan|
|           SC25645|138129|HORIZON FRESH FRUIT| 17,100.00 |     USD|          ZAR|     15.5142|  17-Mar|    21-Jan|
+------------------+------+-------------------+-----------+--------+-------------+------------+--------+----------+

In [39]:
# ADD YEAR TO DATE COLUMNS
due_date="due date"
issue_date="issue date"
year = F.lit("-2022")
due_date_add_year = F.concat(col(due_date),year)
issue_date_add_year = F.concat(col(issue_date),year)
                               
source_df = source_df.withColumn(due_date, due_date_add_year) \
    .withColumn(issue_date, issue_date_add_year)
    


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# FORMAT DATES
source_df = source_df.select(
    col("external reference"), \
    col("number"), \
    col("counterparty"), \
    col("amount due"), \
    col("currency"), \
    col("cost currency"), \
    col("costing rate"), \
    to_date(F.col("due date"), "d-MMM-yyy").alias("due date"), \
    to_date(F.col("issue date"), "d-MMM-yyy").alias("issue date")) \
    
source_df.write.csv(path='s3://sol-dev-output/TruCape-Invoices', mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
source_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+------+-------------------+-----------+--------+-------------+------------+----------+----------+
|external reference|number|       counterparty| amount due|currency|cost currency|costing rate|  due date|issue date|
+------------------+------+-------------------+-----------+--------+-------------+------------+----------+----------+
|           SC25648|138099|         JSC TANDER| 21,124.80 |     USD|          ZAR|     15.5654|2022-04-07|2022-01-21|
|           SC25646|138127|HORIZON FRESH FRUIT| 17,100.00 |     USD|          ZAR|     15.5142|2022-03-17|2022-01-21|
|           SC25645|138129|HORIZON FRESH FRUIT| 17,100.00 |     USD|          ZAR|     15.5142|2022-03-17|2022-01-21|
+------------------+------+-------------------+-----------+--------+-------------+------------+----------+----------+