# Create and/or add data to tvde_earnings_history table in the bronze layer

In [1]:
# Importing libraries
import os
import tkinter as tk
from tkinter import filedialog
import re
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, regexp_extract, when, to_date, unix_timestamp, from_unixtime, concat_ws, lpad, create_map
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType


In [2]:
# layer and table
layer = "bronze"
table = "tvde_earnings_history"
catalog_name = "toll_reconciliation_tool"

# Define the desired warehouse location explicitly
project_dir = "C:/Users/renat/Documents/imgPdados-finance-uber/toll-reconciliation-tool/spark-warehouse"
warehouse_location = "file:///" + os.path.abspath(project_dir)
print(f"Setting Spark warehouse to: {warehouse_location}")

# Change the current working directory
os.chdir(project_dir)
print(f"Changed current working directory to: {os.getcwd()}")

Setting Spark warehouse to: file:///C:\Users\renat\Documents\imgPdados-finance-uber\toll-reconciliation-tool\spark-warehouse
Changed current working directory to: C:\Users\renat\Documents\imgPdados-finance-uber\toll-reconciliation-tool\spark-warehouse


In [3]:
# --- Functions (select_files, add_files_to_list, normalize_spark_dataframe) ---
def select_files():
    """Opens a file selection dialog and returns a list with the paths of the selected files."""
    root = tk.Tk() # Create a Tkinter window
    root.withdraw() # Hide the main window
    file_paths = filedialog.askopenfilenames(
        title="Select files",  # Window title
    )
    return list(file_paths) # Convert the returned tuple to a list

def add_files_to_list(file_list):
    """Adds the paths of the selected files to the list."""
    selected_file_paths = select_files()
    if selected_file_paths: # Check if the user selected any files
        file_list.extend(selected_file_paths)  # Add the paths to the list
        print("Files added:")
        for file_path in selected_file_paths:
            print(file_path)
    else:
        print("No files selected.")


def normalize_spark_dataframe(df, filename):
    """Normalizes a Spark DataFrame according to specified rules, using keyword lists."""
    
    # Define lists of keywords for toll, service fee, and tip
    toll_words = ["pedágio"]
    service_fee_words = ["taxa de serviço"]
    tip_words = ["valor extra"]

    # Extract year and month from filename
    year_match = re.search(r'\d{4}', filename)
    month_match = re.search(
        r'(jan|fev|mar|abr|mai|jun|jul|ago|set|out|nov|dez)', filename, re.IGNORECASE
    )

    # Extract the year if found, otherwise set to None (as a Spark literal).
    year = lit(year_match.group(0)) if year_match else lit(None)

    # Extract the lowercase month string if found, otherwise set to None (as a Spark literal).
    month_str = lit(month_match.group(0).lower()) if month_match else lit(None)

    # Define a mapping between short Portuguese month names and their numerical equivalents.
    month_number_map = {
        'jan': '01', 'fev': '02', 'mar': '03', 'abr': '04', 'mai': '05', 'jun': '06',
        'jul': '07', 'ago': '08', 'set': '09', 'out': '10', 'nov': '11', 'dez': '12'
    }

    # Convert the extracted month string to its numerical equivalent using the mapping.
    # If the month string is not None, get the corresponding value from the map; otherwise, set to None.
    month_number = when(month_str.isNotNull(),
                    create_map([col for pair in [(lit(k), lit(v)) for k, v in month_number_map.items()] for col in pair]).getItem(month_str)
                    ).otherwise(lit(None))

    # Create a new DataFrame with normalized columns.

        # Debugging: Print types of year and month_number
    print(f"Type of year: {type(year)}")
    print(f"Type of month_number: {type(month_number)}")

    try:
        # Debugging: Print types of year and month_number (moved inside try)
        print(f"Inside try block:")  # Indicate we're inside the try block
        print(f"Type of year: {type(year)}")
        print(f"Type of month_number: {type(month_number)}")

        normalized_df = df.withColumn("DataInput", lit(datetime.now())) \
            .withColumn("Date",
                # Extract the date part using a regular expression and format it as YYYY/MM/DD.
                # It handles cases where the day of the week (Portuguese or English) might be present.
                when(regexp_extract(col("Extracted Text"),
                                r'(seg|ter|qua|qui|sex|sáb|dom|Mon|Tue|Wed|Thu|Fri|Sat|Sun)?,? (\d{2}) de ([a-z]{3})', 3
                                ).isNotNull(),
                    concat_ws("/", year.cast("string"), month_number.cast("string"),
                            regexp_extract(col("Extracted Text"),
                                        r'(seg|ter|qua|qui|sex|sáb|dom|Mon|Tue|Wed|Thu|Fri|Sat|Sun)?,? (\d{2}) de ([a-z]{3})', 2
                                        ).cast("string")
                            )
                    ).otherwise(concat_ws("/", year.cast("string"), month_number.cast("string"), lit(None).cast("string")).cast("string"))
                ) \
            .withColumn("Earnings",
                # Extract the earnings value (preceded by '€') and cast it to a double.
                regexp_extract(col("Extracted Text"), r'€\s*([\d,.]+)', 1).cast("double")) \
            .withColumn("Toll",
                # Check if any of the toll keywords are present in the 'Extracted Text'.
                when(col("Extracted Text").rlike(rf'(?i)([\d,.]+)\s*({"|".join(toll_words)})'),  # Corrected line
                    # If a toll keyword is found, extract the corresponding numeric value and cast it to a double (case-insensitive).
                    regexp_extract(col("Extracted Text"), rf'(?i)([\d,.]+)\s*({"|".join(toll_words)})', 1).cast("double")  # Corrected line
                    ).otherwise(lit(None))) \
            .withColumn("ServiceFee",
                # Check if any of the service fee keywords are present (preceded by '€').
                when(col("Extracted Text").rlike(rf'(?i)€\s*([\d,.]+)\s*({"|".join(service_fee_words)})'),  # Corrected line
                    # If a service fee keyword is found, extract the numeric value and cast it to a double (case-insensitive).
                    regexp_extract(col("Extracted Text"), rf'(?i)€\s*([\d,.]+)\s*({"|".join(service_fee_words)})', 1).cast("double")  # Corrected line
                    ).otherwise(lit(None))) \
            .withColumn("Tip",
                # Check if any of the tip keywords are present.
                when(col("Extracted Text").rlike(rf'(?i)([\d,.]+)\s*({"|".join(tip_words)})'),  # Corrected line
                    # If a tip keyword is found, extract the corresponding numeric value and cast it to a double (case-insensitive).
                    regexp_extract(col("Extracted Text"), rf'(?i)([\d,.]+)\s*({"|".join(tip_words)})', 1).cast("double")  # Corrected line
                    ).otherwise(lit(None))) \
            .withColumn("StartTime",
                # Extract the start time in various formats (HH-MM, HH.MM, HH*MM) and standardize it to HH:MM.
                when(regexp_extract(col("Extracted Text"), r'(\d{2}-\d{2}|\d{2}\.\d{2}|\d{2}\*\d{2})',
                                    1).rlike(r'\d{2}[-.:*]\d{2}'),
                    regexp_extract(col("Extracted Text"), r'(\d{2})[-\.\*](\d{2})', 1) + ":" + regexp_extract(
                        col("Extracted Text"), r'\d{2}[-\.\*](\d{2})', 2)
                    ).otherwise(lit(None))) \
            .withColumn("TotalTime",
                # Extract total time in the format 'digits min digits seg' and format it as MM:SS with leading zeros.
                when(regexp_extract(col("Extracted Text"), r'(\d+)\s*min\s*(\d+)\s*seg', 1).isNotNull(),
                    concat_ws(":",
                            # Left-pad the minutes with '0' to ensure two digits.
                            lpad(regexp_extract(col("Extracted Text"), r'(\d+)\s*min', 1), 2, "0"),
                            # Left-pad the seconds with '0' to ensure two digits.
                            lpad(regexp_extract(col("Extracted Text"), r'(\d+)\s*seg', 2), 2, "0")
                            )
                    ).otherwise(lit(None))) \
            .withColumn("Distance",
                # Extract the distance value (followed by ' km') and cast it to a double.
                regexp_extract(col("Extracted Text"), r'([\d,.]+)\s*km', 1).cast("double"))
        return normalized_df

    except Exception as e:
        print(f"Error in normalize_spark_dataframe: {e}")
        print(f"Type of year (in except): {type(year)}")
        print(f"Type of month_number (in except): {type(month_number)}")
        import traceback
        traceback.print_exc()  # Print the full traceback
        return None  # Or raise the exception if you want the program to stop



In [4]:
# Initialize Spark session with the specified warehouse directory
spark = SparkSession.builder \
    .appName(catalog_name) \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()


# Confirming that the session is working
if spark:
    print("Spark session started successfully!")
    print(f"Application name: {spark.sparkContext.appName}")
else:
    print("Failed to start Spark session.")


Spark session started successfully!
Application name: toll_reconciliation_tool


In [5]:
try:
    # Attempt a basic Hive operation
    spark.sql("SHOW DATABASES").show()

except Exception as e:
    print(f"Error: {e}")



+---------+
|namespace|
+---------+
|   bronze|
|  default|
|     gold|
|   silver|
+---------+



In [6]:

# Print Metastore URI for debugging
print(f"Metastore URI: {spark.conf.get('hive.metastore.uris', 'default')}")  # Get with default to avoid error

# Check if the 'bronze' schema exists
result = spark.sql("SHOW SCHEMAS").collect()
schemas = [row[0] for row in result]
print (schemas)


# Tests to find the bronze layer
if "bronze" not in schemas:
    print("Error: The 'bronze' schema does not exist. Please create it.")  # error for not finding the bronze layer

else:
    print("The 'bronze' schema exists.")
    # Switch to the bronze layer
    spark.sql(f"USE {layer}")



Metastore URI: default
['bronze', 'default', 'gold', 'silver']
The 'bronze' schema exists.


In [7]:
# Selecting Files:
filepath_list = [] # Create an empty list
add_files_to_list(filepath_list) # Call the function to add files and print the final list


Files added:
C:/Users/renat/Documents/imgPdados-finance-uber/toll-reconciliation-tool/2023-dezembro.csv


In [8]:
# Define the schema for your CSV files (adjust based on your actual file structure)
# Assuming your CSV has a single column named 'Extracted Text'
schema = StructType([
    StructField("Extracted Text", StringType(), True)
])

In [9]:
# Process and load data using Spark
for filepath in filepath_list:
    try:
        # Read the CSV file directly into a Spark DataFrame
        df = spark.read.csv(filepath, encoding='utf-8', schema=schema)
        # Normalize the Spark DataFrame using the normalize_spark_dataframe function
        normalized_df = normalize_spark_dataframe(df, os.path.basename(filepath))

        # Instead of writing to Delta, print the normalized DataFrame to the console
        print(f"\n--- Normalized DataFrame for file: {filepath} ---")
        normalized_df.show(truncate=False) # Use truncate=False to show full content

        """
        # Write the normalized Spark DataFrame to a Delta table
        normalized_df.write \
            .format('delta') \
            .mode('append') \
            .option('mergeSchema', 'true') \
            .saveAsTable(f'{layer}.{table}')
        print(f"Successfully processed and loaded {filepath} into {layer}.{table}")
        """

    except Exception as e:
        print(f"An error occurred while processing {filepath}: {e}")

print(f"All {len(filepath_list)} files processed and loaded into '{layer}.{table}'.")




Type of year: <class 'pyspark.sql.column.Column'>
Type of month_number: <class 'pyspark.sql.column.Column'>
Inside try block:
Type of year: <class 'pyspark.sql.column.Column'>
Type of month_number: <class 'pyspark.sql.column.Column'>

--- Normalized DataFrame for file: C:/Users/renat/Documents/imgPdados-finance-uber/toll-reconciliation-tool/2023-dezembro.csv ---
An error occurred while processing C:/Users/renat/Documents/imgPdados-finance-uber/toll-reconciliation-tool/2023-dezembro.csv: An error occurred while calling o176.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (Renato-Laptop-LenovoIdeapad320-SSD executor driver): org.apache.spark.SparkRuntimeException: [INVALID_PARAMETER_VALUE.REGEX_GROUP_INDEX] The value of parameter(s) `idx` in `regexp_extract` is invalid: Expects group index between 0 and 1, but got 2.
	at org.apache.spark.sql.errors.QueryExecutionEr

In [0]:
%sql
DESC DETAIL bronze.tvde_earnings_history

In [0]:
%sql
select *
from bronze.tvde_earnings_history

In [None]:
# Stop the Spark session
spark.stop()