In [1]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()
# accounts
df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")

df_accounts.count()


500000

In [3]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# country_abbreviation
df_country_abbreviation = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("country_abbreviation.csv", sep=";") 

df_country_abbreviation.count()

121

In [4]:
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# transactions
df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

df_transactions.count()



5000000

In [None]:
spark.stop()

• Calculate how many accounts of each type there are using Spark SQL. The return type is a dataframe [account_type: string, account_type_count: int]

In [5]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder.appName("AccountTypeCount").getOrCreate()

df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")

df_accounts.createOrReplaceTempView("accounts")

df_transactions.createOrReplaceTempView("transactions")

df_accounts_result = spark.sql("""
    SELECT t.account_type AS account_type, COUNT(t.account_type) AS account_type_count 
    FROM transactions t
    LEFT JOIN accounts a
    ON a.id = t.id
    GROUP BY t.account_type
    """)

df_accounts_result.show()

+------------+------------------+
|account_type|account_type_count|
+------------+------------------+
|    Personal|           1667072|
|Professional|           1667358|
|    Business|           1665570|
+------------+------------------+



 •	Calculate only the balance and the latest date for each account from transactions.csv. To calculate the balance, summarize all the transactions for  
    each account. The return type is a dataframe [account_id: string, balance: string, latest_date: date].

In [None]:
from pyspark.sql import SparkSession

def analyze_transaction_data():
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("Transaction Data Analysis") \
        .getOrCreate()

    # Read transactions data from CSV
    transactions_df = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("transactions.csv", sep=";")

    # Create a temporary view for the transactions DataFrame
    transactions_df.createOrReplaceTempView("transactions")

    # Analyze transaction data to get account balances and latest dates
    accounts_last_date_result_df = spark.sql("""
        SELECT id AS account_id,
               CAST(SUM(amount) AS STRING) AS balance,
               MAX(transaction_date) as latest_date
        FROM transactions 
        GROUP BY id
    """)

    # Show the results
    accounts_last_date_result_df.show()

analyze_transaction_data()


2.	Write a function using Spark Python or Spark Scala API to calculate total earnings (sum of transactions above 0) for each user from Switzerland by year as a pivot table. The result dataframe should contain user full names as one field split by whitespace, years, and earning values. 

In [None]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import sum, col, split, concat_ws
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

# country_abbreviation
df_country_abbreviation = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("country_abbreviation.csv", sep=";") 


df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")


def get_total_earnings():
    df_transactions_1 = df_transactions \
        .join(df_country_abbreviation,
             df_transactions.country == df_country_abbreviation.abbreviation,
             "inner") \
        .join(df_accounts.drop("country"), 
             df_transactions.id == df_accounts.id,
             "inner")

    df_filtered_1 = df_transactions_1.filter((col("country") == "CH") & (col("amount") > 0))
    df_filtered_2 = df_filtered_1.withColumn("year", split(col("transaction_date"), "-")[0].cast("int"))
    df_filtered_3 = df_filtered_2.withColumn("full_name", concat_ws(" ", df_filtered_2.first_name, df_filtered_2.last_name))

    df_partitioned = df_filtered_3.repartition("country")
    
    df_result = df_partitioned.groupBy("full_name").pivot("year").sum("amount")
    df_result.show()


get_total_earnings()

In [None]:
2.	Write a function that expects a transactions dataset as input and returns it with an additional column "level". The value of "level " is calculated based on the "amount" column as:
•	Top 25% of all transactions get a value "high".
•	The next 50% of all transactions get "average".
•	The rest gets "low".

In [4]:
from pyspark.sql.functions import col, when,lit, monotonically_increasing_id
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import broadcast, lit
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define schema for the Row
schema = StructType([
    StructField("total", IntegerType(), True)
])

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()


# transactions
df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

def get_level_by_percent(data_set):
    # Create a SparkSession
    spark = SparkSession.builder \
        .appName("Get Level by Percent") \
        .getOrCreate()

    # Read transactions data
    df_transactions = spark.read \
        .option("header", "true") \
        .option("inferSchema", "true") \
        .csv("transactions.csv", sep=";")

    # Count the total number of rows in the dataset
    total_count = data_set.count()

    # Broadcast the total count
    broadcast_df_small = broadcast(spark.createDataFrame([Row(total=total_count)]))

    # Order the data by "amount" in descending order
    data_set_ordered = data_set.orderBy("amount", ascending=False)

    # Join with broadcasted total count and add an "order" column
    data_set_positioned = data_set_ordered.join(broadcast_df_small).withColumn("order", monotonically_increasing_id())

    # Calculate the "level" based on the order and total count
    data_set_positioned.withColumn("level",
        when(((col("order") / col("total")) * 100) <= lit(25), "high") \
        .when(((col("order") / col("total")) * 100) <= lit(50), "average") \
        .otherwise("low")
    ).select("id", "account_type", "amount", "country", "level").show()

if __name__ == "__main__":
    get_level_by_percent(df_transactions)


+------+------------+-------+-------+-----+
|    id|account_type| amount|country|level|
+------+------------+-------+-------+-----+
|179528|    Business|-730.86|     SV| high|
|378343|    Personal|-946.98|     YE| high|
| 75450|Professional|7816.92|     SI| high|
|357719|    Business| 704.02|     ID| high|
|110511|    Personal| 3462.6|     BS| high|
|461830|Professional| 762.81|     CN| high|
| 30180|Professional|5390.24|     GN| high|
| 65398|    Personal|4765.77|     TR| high|
|170899|    Business|8775.89|     SK| high|
|234300|Professional|8455.18|     LU| high|
|208027|    Business| 6244.1|     AE| high|
|161212|    Personal|5904.56|     EG| high|
|105372|Professional|4079.76|     MT| high|
|205321|Professional| 3570.4|     MU| high|
|410863|    Business|2328.83|     SR| high|
|486752|Professional| 5454.8|     CU| high|
|208564|    Personal|8695.17|     IT| high|
|196682|    Personal|-905.87|     HU| high|
|491196|Professional|8781.02|     IR| high|
|108286|    Personal|3485.95|   

2.	Write a function that reads a text file in the format presented below, and then ensure that it returns a data frame of table content with proper columns.

In [19]:
from pyspark.sql.functions import col, lit, regexp_replace
from pyspark.sql import SparkSession
import pandas as pd

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

def get_format_file():
    # Read the text file
    file_read = spark.read.text("format_file.txt")

    # Clean and format the data
    file_cleaned = file_read.withColumn("first_format", regexp_replace(col("value"), "-", ""))
    file_formatted = file_cleaned.withColumn("final_format", regexp_replace(col("first_format"), "\\+", "")) \
        .filter(col("final_format") != lit(""))

    # Get the header and convert the DataFrame to Pandas
    file_header = file_formatted.first()
    df_pandas = file_formatted.filter(col("final_format") != file_header["final_format"]) \
        .select("final_format").toPandas()

    # Split the final_format column and rename columns
    df = df_pandas['final_format'].str.split('|', expand=True)
    df = df.drop(df.columns[0], axis=1)
    df = df.drop(df.columns[-1], axis=1)
    df.rename(columns={1: 'id', 2: 'Col1', 3: 'Col2'}, inplace=True)

    # Create a Spark DataFrame and show the result
    final_df = spark.createDataFrame(df)
    final_df.show()

# Call the function to execute the code
get_format_file()

+---+------------------+-----+
| id|              Col1| Col2|
+---+------------------+-----+
|  1|     one,two,three|  one|
|  2|     four,one,five|  six|
|  3|seven,nine,one,two|eight|
|  4|    two,three,five| five|
|  5|      six,five,one|seven|
+---+------------------+-----+

