
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/simulated_transaction_2024.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Create a view or table

temp_table_name = "lloyds_data_two"
df.createOrReplaceTempView(temp_table_name)

display(df)

Date,Timestamp,Account No,Balance,Amount,Third Party Account No,Third Party Name
01/01/2023,00:00,678330503.0,2971.0,1584.0,,Westport Care Home
01/01/2023,00:00,472213568.0,3792.0,1950.0,,Barbiee Boutique
01/01/2023,00:00,472213568.0,3012.0,-780.0,283027736.0,
01/01/2023,00:00,283027736.0,1787.0,780.0,472213568.0,
01/01/2023,00:00,624500124.0,3226.0,1825.0,,Fat Face
01/01/2023,00:00,203466392.0,4607.66,2841.66,,Lavender Primary
01/01/2023,00:00,768271776.0,3620.0,1950.0,,A Cut Above
01/01/2023,00:00,768271776.0,2840.0,-780.0,215404070.0,
01/01/2023,00:00,215404070.0,1965.0,780.0,768271776.0,
01/01/2023,00:00,456221621.0,2831.0,1675.0,,Tesco


In [0]:
from pyspark.sql.functions import col, when, lower, regexp_replace, to_date, date_format

# File location and type
file_location = "/FileStore/tables/simulated_transaction_2024.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# Read CSV file into DataFrame
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

# Changing the column names
new_column_names = ["date_id", "time_stamp", "account_id", "balance", "amount", "external_account_id", "external_account_desc"]
df = df.toDF(*new_column_names)

# Check if all values in 'account_id' end with '.0' and remove decimal part
df = df.withColumn("account_id_whole", 
                   when(col("account_id").substr(-2, 2) == ".0", 
                        col("account_id").cast("integer")).otherwise(col("account_id")))
df = df.drop("account_id").withColumnRenamed("account_id_whole", "account_id")

# Convert amount column in to a float
df = df.withColumn("amount", df["amount"].cast("float"))

# Convert balance column in to a float
df = df.withColumn("balance", df["balance"].cast("float"))

# Cleaning the purchase type column so its consistent
# Removing _ and replacing with spaces and lowercasing all text
# df = df.withColumn("transaction_type",
#                    when(df["transaction_type"].rlike("[A-Z_]"),
#                         lower(regexp_replace(df["transaction_type"], "_", " "))).otherwise(df["transaction_type"]))
                
# Convert date_id to date type
df = df.withColumn("date_id", to_date(df["date_id"], "dd/MM/yyyy"))

# Format date as yyyymmdd
df = df.withColumn("date_id", date_format(col("date_id"), "yyyyMMdd"))

# Reorder columns
df = df.select("date_id", "time_stamp", "account_id", "balance", "amount", "external_account_id", "external_account_desc")

temp_table_name = "clean_lloyds_data_two"
df.createOrReplaceTempView(temp_table_name)

# Display the DataFrame
display(df)
df.printSchema()


date_id,time_stamp,account_id,balance,amount,external_account_id,external_account_desc
20230101.0,00:00,678330503.0,2971.0,1584.0,,Westport Care Home
20230101.0,00:00,472213568.0,3792.0,1950.0,,Barbiee Boutique
20230101.0,00:00,472213568.0,3012.0,-780.0,283027736.0,
20230101.0,00:00,283027736.0,1787.0,780.0,472213568.0,
20230101.0,00:00,624500124.0,3226.0,1825.0,,Fat Face
20230101.0,00:00,203466392.0,4607.66,2841.66,,Lavender Primary
20230101.0,00:00,768271776.0,3620.0,1950.0,,A Cut Above
20230101.0,00:00,768271776.0,2840.0,-780.0,215404070.0,
20230101.0,00:00,215404070.0,1965.0,780.0,768271776.0,
20230101.0,00:00,456221621.0,2831.0,1675.0,,Tesco


root
 |-- date_id: string (nullable = true)
 |-- time_stamp: string (nullable = true)
 |-- account_id: string (nullable = true)
 |-- balance: float (nullable = true)
 |-- amount: float (nullable = true)
 |-- external_account_id: string (nullable = true)
 |-- external_account_desc: string (nullable = true)



In [0]:
%sql
SELECT  
DISTINCT
    external_account_desc
FROM
  clean_lloyds_data_two
WHERE
  external_account_id IS NULL

external_account_desc
Cass Art
Rose & Crown
Frankie & Bennies
Fat Face
Sunny Care Nursery
A Cut Above
Etsy
Pets Corner
Premier Finance
RugbyFields


In [0]:
%sql
SELECT  
  DISTINCT
    external_account_id,
    external_account_desc
FROM
  clean_lloyds_data_two
WHERE
  external_account_desc LIKE '%Rose & Crown%'

external_account_id,external_account_desc
,Rose & Crown


In [0]:
%sql
SELECT  
  DISTINCT
    external_account_id,
    external_account_desc
FROM
  clean_lloyds_data_two
WHERE
  external_account_id = 426466211

external_account_id,external_account_desc
426466211,


In [0]:
# Removes the table if its stored - Error handling
dbutils.fs.rm("dbfs:/user/hive/warehouse/lloyds_raw_data_table/", True)

# Saves the new table
new_transactional_data = "simulated_transaction_2024_csv"

# df.write.format("parquet").saveAsTable(raw_transactional_data)
df.write.mode("overwrite").format("parquet").saveAsTable(new_transactional_data)