In [None]:
from datetime import datetime
from io import StringIO

from notebookutils import mssparkutils
import numpy as np
import pandas as pd

In [None]:
container_landing = "abfs://landing@storageXtarget.dfs.core.windows.net"
container_curated = "abfs://curated@storageXtarget.dfs.core.windows.net"

flow_folder_path = "Flux_flow-1"

# Retrieve the last date of the extraction date folder

In [None]:
extraction_path = f"{container_landing}/{flow_folder_path}/"

list_dates = mssparkutils.fs.ls(extraction_path)
last_date_path = sorted(list_dates, key=lambda x: x.name, reverse=True)[0].path
file_path = mssparkutils.fs.ls(last_date_path)[0].path

print("landing file path:")
print(file_path)
print()

# Reading CSV file from landing as pandas dataframe

In [None]:
df_flow1 = pd.read_csv(file_path, header=None)
# Convert dataframe to csv to remove double quotes
csv_flow1 = df_flow1.to_csv(index=False, header=False)
# Remove semicolons at the end of each row
csv_flow1_clean = csv_flow1.replace(";\n", "\n")

print("Clean CSV :")
print(csv_flow1_clean)
print()

In [None]:
# Create a file-like object from the CSV data
csv_file = StringIO(csv_flow1_clean)
flow1_cols = [
    "Entity",
    "spf_mean",
    "spf_score",
    "spf_count",
    "wp_mean",
    "wp_score",
    "wp_count"
]
# Read the CSV file into a Pandas DataFrame
df_flow1_clean = pd.read_csv(csv_file, delimiter=";", index_col=False, usecols=flow1_cols)

# Adding Pole column:

In [None]:
entity_mapping = {
    "X STADIUM" : "X CONCESSIONS",
    "X AIRPORTS" : "X CONCESSIONS",
    "X RAILWAYS" : "X CONCESSIONS",
    "X CONCESSIONS HOLDING" : "X CONCESSIONS",
    "X HIGHWAYS" : "X CONCESSIONS"
}

df_flow1_clean["Pole"] = np.where(df_flow1_clean["Entity"].isin(entity_mapping.keys()), 
                                    "X CONCESSIONS", 
                                    df_flow1_clean["Entity"])

# Mapping of the dataset

In [None]:
column_names = [
    "Pole",
    "Entity",
    "spf_mean",
    "spf_score",
    "spf_count",
    "wp_mean",
    "wp_score",
    "wp_count"
]
df_flow1_mapped = df_flow1_clean[column_names]

# Getting the current date for snapshot

In [None]:
# Get the current UTC timestamp
now = datetime.utcnow()

# Create the folder name with today's date
snapshot = now.strftime("%Y-%m-%d")

# Save CSV file in Parquet format

In [None]:
file_name = (file_path.split("/")[-1]
                      .replace(".csv", ""))
flow1_curated_path = f"{container_curated}/{flow_folder_path}/flow-1.parquet/SNAPSHOT={snapshot}/{file_name}.snappy.parquet"
print("curated file path:")
print(flow1_curated_path)
print()

df_flow1_mapped.to_parquet(flow1_curated_path, index=False)