In [1]:
import pyspark  

sc = pyspark.SparkContext('local[*]') 

# do something to prove it works 

rdd = sc.parallelize(range(1000)) 

rdd.takeSample(False, 5) 

[13, 348, 900, 238, 291]

In [2]:
!ls

account.csv  country_abbreviation.csv  transactions.csv  Untitled.ipynb  work


In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSV File Count Example") \
    .getOrCreate()

In [9]:
# Read account.csv file
account_df = spark.read.csv("account.csv", header=True)
account_count = account_df.count()
print(f"The account.csv file has {account_count} rows.")

# Read transactions.csv file
transactions_df = spark.read.csv("transactions.csv", header=True)
transactions_count = transactions_df.count()
print(f"The transactions.csv file has {transactions_count} rows.")

# Read country_abbreviation.csv file
country_abbreviation_df = spark.read.csv("country_abbreviation.csv", header=True)
country_abbreviation_count = country_abbreviation_df.count()
print(f"The country_abbreviation.csv file has {country_abbreviation_count} rows.")

The account.csv file has 5 rows.
The transactions.csv file has 10 rows.
The country_abbreviation.csv file has 5 rows.


In [8]:
account_count

5

In [10]:

# Spark SQL 2
transactions_df = spark.read.csv("transactions.csv", header=True)

# Register the DataFrame as a SQL temporary table
transactions_df.createOrReplaceTempView("transactions")

# SQL Query to count the number of accounts of each type
account_type_count_df = spark.sql(
    "SELECT account_type, COUNT(DISTINCT account_id) as account_type_count "
    "FROM transactions "
    "GROUP BY account_type"
)

# Show the result
account_type_count_df.show()

+------------+------------------+
|account_type|account_type_count|
+------------+------------------+
|    checking|                 2|
|     savings|                 3|
+------------+------------------+



In [12]:
from pyspark.sql import functions as F

transactions_df = spark.read.csv("transactions.csv", header=True)

# Convert amount to float and transaction_date to date type
transactions_df = transactions_df.withColumn("amount", F.col("amount").cast("float"))
transactions_df = transactions_df.withColumn("transaction_date", F.col("transaction_date").cast("date"))

# Register the DataFrame as a SQL temporary view
transactions_df.createOrReplaceTempView("transactions")

# SQL Query to calculate the balance and latest transaction date for each account
result_df = spark.sql(
    """
    SELECT 
        account_id, 
        SUM(amount) as balance, 
        MAX(transaction_date) as latest_date
    FROM transactions
    GROUP BY account_id
    """
)

# Convert balance to string type as per requirement
result_df = result_df.withColumn("balance", F.col("balance").cast("string"))

# Show the result
result_df.show()

+----------+-------+-----------+
|account_id|balance|latest_date|
+----------+-------+-----------+
|         3|  370.0| 2023-07-21|
|         5|  200.0| 2023-07-30|
|         1|  150.0| 2023-07-05|
|         4|  800.0| 2023-07-25|
|         2|  400.0| 2023-07-12|
+----------+-------+-----------+



In [14]:
account_df = spark.read.csv("account.csv", header=True)

# Read transactions.csv file
transactions_df = spark.read.csv("transactions.csv", header=True)

# Convert amount to float and transaction_date to date type
transactions_df = transactions_df.withColumn("amount", F.col("amount").cast("float"))
transactions_df = transactions_df.withColumn("transaction_date", F.col("transaction_date").cast("date"))

# Extract year from transaction_date
transactions_df = transactions_df.withColumn("year", F.year("transaction_date"))

# Filter for Swiss users and join with transactions
swiss_users_df = account_df.filter(account_df.country == "USA") # Assuming "CHE" stands for Switzerland
joined_df = transactions_df.join(swiss_users_df, transactions_df.account_id == swiss_users_df.id)

# Calculate user full name and filter for positive transactions (earnings)
joined_df = joined_df.withColumn("full_name", F.concat_ws(" ", "first_name", "last_name"))
positive_transactions_df = joined_df.filter(joined_df.amount > 0)

# Calculate total earnings for each Swiss user by year
pivot_df = positive_transactions_df.groupBy("full_name").pivot("year").sum("amount")

# Show the result
pivot_df.show()

+---------+-----+
|full_name| 2023|
+---------+-----+
| John Doe|200.0|
+---------+-----+



In [15]:
def add_transaction_level(spark, transactions_df):
    # Convert amount to float if it's not already
    transactions_df = transactions_df.withColumn("amount", F.col("amount").cast("float"))

    # Calculate 25th and 75th percentiles
    quantiles = transactions_df.approxQuantile("amount", [0.25, 0.75], 0.01)
    lower_quantile = quantiles[0]
    upper_quantile = quantiles[1]
    
    # Add "level" column based on "amount"
    transactions_with_level = transactions_df.withColumn(
        "level",
        F.when(F.col("amount") > upper_quantile, "high")
        .when((F.col("amount") > lower_quantile) & (F.col("amount") <= upper_quantile), "average")
        .otherwise("low")
    )

    # Return the DataFrame with the additional "level" column
    return transactions_with_level



In [17]:
transactions_df.show()

+----------+------+------------+----------------+----+
|account_id|amount|account_type|transaction_date|year|
+----------+------+------------+----------------+----+
|         1| 200.0|     savings|      2023-07-01|2023|
|         1| -50.0|     savings|      2023-07-05|2023|
|         2| 500.0|    checking|      2023-07-10|2023|
|         2|-100.0|    checking|      2023-07-12|2023|
|         3| 400.0|     savings|      2023-07-20|2023|
|         3| -30.0|     savings|      2023-07-21|2023|
|         4|1000.0|     savings|      2023-07-22|2023|
|         4|-200.0|     savings|      2023-07-25|2023|
|         5| 250.0|    checking|      2023-07-28|2023|
|         5| -50.0|    checking|      2023-07-30|2023|
+----------+------+------------+----------------+----+



In [18]:
transactions_with_level = add_transaction_level(spark, transactions_df)

In [19]:
transactions_with_level.show()

+----------+------+------------+----------------+----+-------+
|account_id|amount|account_type|transaction_date|year|  level|
+----------+------+------------+----------------+----+-------+
|         1| 200.0|     savings|      2023-07-01|2023|average|
|         1| -50.0|     savings|      2023-07-05|2023|    low|
|         2| 500.0|    checking|      2023-07-10|2023|   high|
|         2|-100.0|    checking|      2023-07-12|2023|    low|
|         3| 400.0|     savings|      2023-07-20|2023|average|
|         3| -30.0|     savings|      2023-07-21|2023|average|
|         4|1000.0|     savings|      2023-07-22|2023|   high|
|         4|-200.0|     savings|      2023-07-25|2023|    low|
|         5| 250.0|    checking|      2023-07-28|2023|average|
|         5| -50.0|    checking|      2023-07-30|2023|    low|
+----------+------+------------+----------------+----+-------+



In [30]:
import pandas as pd
import re

def read_pyspark_output_to_dataframe(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    # Remove the separator lines and strip whitespace
    clean_lines = [line.strip() for line in lines if re.search(r'[a-zA-Z0-9]+', line)]
    columns_line = clean_lines[0]
    columns = columns_line.split("|")[1:-1]
    columns = [col.strip() for col in columns]

    # print(columns)
    rows = []
    for line in clean_lines[1:]:
        row_data = line.split("|")[1:-1]
        row_data = [item.strip() for item in row_data]
        rows.append(row_data)
    # print(rows)
    df = pd.DataFrame(rows, columns=columns)
    return df

# Example usage
file_path = 'parse_file.txt'
df = read_pyspark_output_to_dataframe(file_path)
print(df)


  id                Col1   Col2
0  1       one,two,three    one
1  2       four,one,five    six
2  3  seven,nine,one,two  eight
3  4      two,three,five   five
4  5        six,five,one  seven
