In [18]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("CSV Reader").getOrCreate()
df_accounts = spark.read.csv("accounts.csv", header=True, inferSchema=True)
df_accounts.createOrReplaceTempView("accounts")
result1 = spark.sql("""
    SELECT *
    FROM accounts 
""")

result1.show()


+---------------------------------------------+
|"id";"first_name";"last_name";"age";"country"|
+---------------------------------------------+
|                         1;Darcy;Phillips;...|
|                         2;Amelia;Wright;6...|
|                          3;Haris;Ellis;61;CR|
|                            4;Tony;Hall;51;JO|
|                         5;Rubie;Stewart;2...|
|                          6;Miley;Perry;27;ZA|
|                         7;Marcus;Carter;6...|
|                         8;Charlie;Harris;...|
|                         9;Honey;Rogers;60;IL|
|                         10;Luke;Harris;66;IR|
|                         11;Spike;Murphy;5...|
|                         12;Vincent;Adams;...|
|                         13;James;Barnes;5...|
|                         14;George;Bailey;...|
|                         15;Sienna;Holmes;...|
|                         16;Isabella;Ellio...|
|                         17;Freddie;Martin...|
|                         18;Kate;Wright

In [20]:
df_transactions = spark.read.csv("transactions.csv", header=True, inferSchema=True)
df_transactions.createOrReplaceTempView("transactions")
account_type_counts = spark.sql("""
    SELECT *
    FROM transactions 
""")

account_type_counts.show()



+---------------------------------------------------------+
|"id";"amount";"account_type";"transaction_date";"country"|
+---------------------------------------------------------+
|                                     179528;-730.86;Bu...|
|                                     378343;-946.98;Pe...|
|                                     75450;7816.92;Pro...|
|                                     357719;704.02;Bus...|
|                                     110511;3462.6;Per...|
|                                     461830;762.81;Pro...|
|                                     30180;5390.24;Pro...|
|                                     65398;4765.77;Per...|
|                                     170899;8775.89;Bu...|
|                                     234300;8455.18;Pr...|
|                                     208027;6244.1;Bus...|
|                                     161212;5904.56;Pe...|
|                                     105372;4079.76;Pr...|
|                                     20

In [21]:
df_transactions = spark.read.option("delimiter", ";").option("header", "true").option("inferSchema", "true").csv("transactions.csv")
df_transactions.show()

+------+-------+------------+----------------+-------+
|    id| amount|account_type|transaction_date|country|
+------+-------+------------+----------------+-------+
|179528|-730.86|    Business|      2013-07-10|     SV|
|378343|-946.98|    Personal|      2018-04-06|     YE|
| 75450|7816.92|Professional|      2016-11-20|     SI|
|357719| 704.02|    Business|      2016-11-06|     ID|
|110511| 3462.6|    Personal|      2018-01-18|     BS|
|461830| 762.81|Professional|      2017-06-20|     CN|
| 30180|5390.24|Professional|      2021-05-26|     GN|
| 65398|4765.77|    Personal|      2018-05-01|     TR|
|170899|8775.89|    Business|      2013-10-16|     SK|
|234300|8455.18|Professional|      2015-10-06|     LU|
|208027| 6244.1|    Business|      2020-03-06|     AE|
|161212|5904.56|    Personal|      2016-09-07|     EG|
|105372|4079.76|Professional|      2015-02-12|     MT|
|205321| 3570.4|Professional|      2012-07-02|     MU|
|410863|2328.83|    Business|      2012-12-20|     SR|
|486752| 5

In [23]:
df_transactions.createOrReplaceTempView("transactions")
account_type_counts = spark.sql("""
    SELECT account_type, COUNT(DISTINCT id) as account_type_count 
    FROM transactions 
    GROUP BY account_type
""")

account_type_counts.show()


+------------+------------------+
|account_type|account_type_count|
+------------+------------------+
|    Personal|            481997|
|Professional|            482170|
|    Business|            482350|
+------------+------------------+



In [25]:
result = spark.sql("""
    SELECT id, 
           SUM(amount) as balance, 
           MAX(transaction_date) as latest_date 
    FROM transactions 
    GROUP BY id
""")

# Show the results
result.show()


+------+------------------+-----------+
|    id|           balance|latest_date|
+------+------------------+-----------+
|482333|          27174.07| 2020-07-17|
|222048|          48004.81| 2020-07-20|
|328078|          36948.25| 2020-02-01|
|192401|36736.979999999996| 2020-01-30|
|273916| 47475.37999999999| 2021-05-30|
|485103|          62198.93| 2021-05-22|
|300282|          55103.62| 2021-05-01|
| 20683| 56448.72000000001| 2021-10-27|
| 15846|58671.909999999996| 2020-12-23|
|446783|          98085.51| 2021-12-11|
| 92182|           42335.3| 2020-08-08|
|477485|          22114.03| 2020-05-23|
|171142|40428.899999999994| 2021-04-07|
|317762|          40025.55| 2021-12-02|
| 65478|           57941.9| 2021-10-06|
|306768|          26566.93| 2019-12-19|
|380411|          43652.94| 2020-06-02|
|304681|          37827.69| 2021-03-26|
|475638|44509.100000000006| 2021-11-23|
| 97413|          39611.24| 2018-05-01|
+------+------------------+-----------+
only showing top 20 rows



In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, year, concat_ws

# Initialize Spark session
spark = SparkSession.builder \
    .appName("SwissEarningsCalculation") \
    .getOrCreate()

# Load the datasets from the current folder with semicolon delimiters and headers
accounts = spark.read.csv("accounts.csv", header=True, sep=";", inferSchema=True)
transactions = spark.read.csv("transactions.csv", header=True, sep=";", inferSchema=True)
country_abbreviations = spark.read.csv("country_abbreviation.csv", header=True, sep=";", inferSchema=True)

# Columns based on your descriptions:
# accounts: id, first_name, last_name, country
# transactions: id, amount, transaction_date
# country_abbreviations: abbreviation, country_full_name

# Filter Swiss users
swiss_accounts = accounts.join(country_abbreviations, accounts.country == country_abbreviations.abbreviation) \
    .filter(col("country_full_name") == "Switzerland")

# Filter transactions with positive value and extract year
transactions = transactions.filter(col("amount") > 0) \
    .withColumn("year", year(col("transaction_date")))

# Join accounts with transactions using "id" column
joined_data = transactions.join(swiss_accounts, transactions.id == swiss_accounts.id)

# Group by user full name and year to aggregate earnings
aggregated_data = joined_data.groupBy(
    concat_ws(" ", col("first_name"), col("last_name")).alias("full_name"),
    "year"
).agg(sum("amount").alias("earnings"))

# Pivot table by year
pivot_table = aggregated_data.groupBy("full_name").pivot("year").sum("earnings").na.fill(0)

pivot_table.show()

# Stop the spark session
spark.stop()


+----------------+--------+------------------+------------------+--------+--------+------------------+------------------+-------+------------------+------------------+------------------+
|       full_name|    2011|              2012|              2013|    2014|    2015|              2016|              2017|   2018|              2019|              2020|              2021|
+----------------+--------+------------------+------------------+--------+--------+------------------+------------------+-------+------------------+------------------+------------------+
|   Connie Gibson|     0.0|           8486.84|           3664.69| 7385.69|     0.0|           1717.48|           2436.81|    0.0|           5545.76|               0.0|               0.0|
|    Paige Taylor|     0.0|               0.0|           5618.18|     0.0|     0.0|           2784.89|           2634.62|    0.0|           9963.01|          14458.31|            436.22|
|  Adison Douglas|     0.0|          10622.48|          14322.94|

In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, when, percentile_approx

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Transaction Levels") \
    .getOrCreate()

def assign_transaction_level(transactions):
    # Calculate the 25th and 75th percentiles of the 'amount' column
    percentiles = transactions.approxQuantile("amount", [0.25, 0.75], 0.01)
    low_threshold, high_threshold = percentiles

    # Add the 'level' column based on the 'amount' column
    transactions_with_level = transactions.withColumn(
        "level",
        when(col("amount") >= high_threshold, "high")
        .when((col("amount") < high_threshold) & (col("amount") >= low_threshold), "average")
        .otherwise("low")
    )

    return transactions_with_level

# Load the transactions dataset with ';' delimiter and header
transactions = spark.read.csv("transactions.csv", header=True, inferSchema=True, delimiter=';')

# Process the dataset
result = assign_transaction_level(transactions)
result.show()

# Optionally, stop the Spark session
spark.stop()


TypeError: DataFrameReader.csv() got an unexpected keyword argument 'delimiter'

In [30]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, percentile_approx

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Transaction Levels") \
    .getOrCreate()

def assign_transaction_level(transactions):
    # Calculate the 25th and 75th percentiles of the 'amount' column
    percentiles = transactions.approxQuantile("amount", [0.25, 0.75], 0.01)
    low_threshold, high_threshold = percentiles

    # Add the 'level' column based on the 'amount' column
    transactions_with_level = transactions.withColumn(
        "level",
        when(col("amount") >= high_threshold, "high")
        .when((col("amount") < high_threshold) & (col("amount") >= low_threshold), "average")
        .otherwise("low")
    )

    return transactions_with_level

# Load the transactions dataset with ';' delimiter and header
transactions = spark.read.option("delimiter", ";").csv("transactions.csv", header=True, inferSchema=True)

# Process the dataset
result = assign_transaction_level(transactions)
result.show()

# Optionally, stop the Spark session
spark.stop()


+------+-------+------------+----------------+-------+-------+
|    id| amount|account_type|transaction_date|country|  level|
+------+-------+------------+----------------+-------+-------+
|179528|-730.86|    Business|      2013-07-10|     SV|    low|
|378343|-946.98|    Personal|      2018-04-06|     YE|    low|
| 75450|7816.92|Professional|      2016-11-20|     SI|   high|
|357719| 704.02|    Business|      2016-11-06|     ID|    low|
|110511| 3462.6|    Personal|      2018-01-18|     BS|average|
|461830| 762.81|Professional|      2017-06-20|     CN|    low|
| 30180|5390.24|Professional|      2021-05-26|     GN|average|
| 65398|4765.77|    Personal|      2018-05-01|     TR|average|
|170899|8775.89|    Business|      2013-10-16|     SK|   high|
|234300|8455.18|Professional|      2015-10-06|     LU|   high|
|208027| 6244.1|    Business|      2020-03-06|     AE|average|
|161212|5904.56|    Personal|      2016-09-07|     EG|average|
|105372|4079.76|Professional|      2015-02-12|     MT|a