In [1]:
import pyspark 
sc = pyspark.SparkContext('local[*]')
# do something to prove it works
rdd = sc.parallelize(range(1000))
rdd.takeSample(False, 5)

[903, 320, 513, 229, 22]

In [6]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# 5.	Read three datasets by Spark and print counts for each.

In [3]:
# accounts
df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")

df_accounts.count()

500000

In [4]:
# country_abbreviation
df_country_abbreviation = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("country_abbreviation.csv", sep=";") 

df_country_abbreviation.count()

121

In [5]:
# transactions
df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

df_transactions.count()

5000000

 •	Calculate how many accounts of each type there are using Spark SQL. The return type is a dataframe [account_type: string, account_type_count: int] 

In [2]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")

df_accounts.createOrReplaceTempView("accounts")

df_transactions.createOrReplaceTempView("transactions")

df_accounts_result = spark.sql("""
    SELECT t.account_type AS account_type,
           COUNT(t.account_type) AS account_type_count 
    FROM transactions t
    LEFT JOIN
         accounts a
    ON
        a.id = t.id
    GROUP BY t.account_type
    """)

df_accounts_result.show()

+------------+------------------+
|account_type|account_type_count|
+------------+------------------+
|    Personal|           1667072|
|Professional|           1667358|
|    Business|           1665570|
+------------+------------------+



In [3]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

df_transactions.createOrReplaceTempView("transactions")


df_accounts_last_date_result = spark.sql("""
    SELECT id AS account_id,
           CAST(SUM(amount) AS STRING) AS balance,
           MAX(transaction_date) as latest_date
    FROM transactions 
    GROUP BY id
    """)

df_accounts_last_date_result.show()

+----------+------------------+-----------+
|account_id|           balance|latest_date|
+----------+------------------+-----------+
|    482333|          27174.07| 2020-07-17|
|    222048|          48004.81| 2020-07-20|
|    328078|          36948.25| 2020-02-01|
|    192401|          36736.98| 2020-01-30|
|    273916| 47475.37999999999| 2021-05-30|
|    485103|          62198.93| 2021-05-22|
|    300282|55103.619999999995| 2021-05-01|
|     20683|          56448.72| 2021-10-27|
|     15846| 58671.90999999999| 2020-12-23|
|    446783| 98085.51000000001| 2021-12-11|
|     92182|           42335.3| 2020-08-08|
|    477485|          22114.03| 2020-05-23|
|    171142|40428.899999999994| 2021-04-07|
|    317762|          40025.55| 2021-12-02|
|     65478|           57941.9| 2021-10-06|
|    306768|          26566.93| 2019-12-19|
|    380411|          43652.94| 2020-06-02|
|    304681|          37827.69| 2021-03-26|
|    475638|           44509.1| 2021-11-23|
|     97413|          39611.24| 

2.	Write a function using Spark Python or Spark Scala API to calculate total earnings (sum of transactions above 0) for each user from Switzerland by year as a pivot table. The result dataframe should contain user full names as one field split by whitespace, years, and earning values. 

In [1]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import sum, col, split, concat_ws
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

# country_abbreviation
df_country_abbreviation = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("country_abbreviation.csv", sep=";") 


df_accounts = spark.read \
    .option("header", "true")  \
    .option("inferSchema", "true")  \
    .csv("accounts.csv", sep=";")


def get_total_earnings():
    df_transactions_joined = df_transactions \
        .join(df_country_abbreviation,
             df_transactions.country == df_country_abbreviation.abbreviation,
             "left") \
        .join(df_accounts.drop("country"), # we delete the country field to remove the ambiguity error.
             df_transactions.id == df_accounts.id,
             "left")

    df_filtered_1 = df_transactions_joined.filter((col("country") == "CH") & (col("amount") > 0))
    df_filtered_2 = df_filtered_1.withColumn("year", split(col("transaction_date"), "-")[0].cast("int"))
    df_filtered_3 = df_filtered_2.withColumn("full_name", concat_ws(" ", df_filtered_2.first_name, df_filtered_2.last_name))
    df_result = df_filtered_3.groupBy("full_name").pivot("year").sum("amount")
    df_result.show()


get_total_earnings()

+-----------------+------------------+------------------+-------+--------+-------+-------+------------------+--------+-------+-------+-------+
|        full_name|              2011|              2012|   2013|    2014|   2015|   2016|              2017|    2018|   2019|   2020|   2021|
+-----------------+------------------+------------------+-------+--------+-------+-------+------------------+--------+-------+-------+-------+
|      Luke Carter|              null|1585.4099999999999|  93.69|    null|   null|   null|           7029.37| 8340.16|   null|   null|   null|
|       Myra Owens|           7290.28|              null|   null| 7508.42|   null|   null|           19543.1|    null|   null|   null|   null|
|    Darcy Edwards|              null|              null|   null|    null|   null|   null|           7892.65| 8538.91|2252.85|   null|   null|
|    Honey Barrett|           8382.42|              null|   null|    null|   null| 4415.3|              null|    null|   null|   null|   null|

2.	Write a function that expects a transactions dataset as input and returns it with an additional column "level". The value of "level " is calculated based on the "amount" column as:
•	Top 25% of all transactions get a value "high".
•	The next 50% of all transactions get "average".
•	The rest gets "low".

In [1]:
from pyspark.sql.functions import col, when,lit, monotonically_increasing_id
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()

# transactions
df_transactions = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("transactions.csv", sep=";")

def get_level_by_percent(data_set):
    total_count = data_set.count()
    data_set_ordered = data_set.orderBy("amount", ascending=False)
    data_set_positioned = data_set_ordered.withColumn("order", monotonically_increasing_id())
    data_set_count = data_set_positioned.withColumn("total",lit(total_count))
    data_set_count.withColumn("level",
                                when(((col("order")/col("total"))*100)<= lit(25), "high") \
                               .when(((((col("order")/col("total"))*100)>= lit(25)) & (((col("order")/col("total"))*100)<= lit(50))), "average") \
                               .otherwise("low")).select("id","account_type","amount","country","level").show()


get_level_by_percent(df_transactions)
    

+------+------------+-------+-------+-----+
|    id|account_type| amount|country|level|
+------+------------+-------+-------+-----+
|426326|    Business|9999.99|     NL| high|
|281332|Professional|9999.99|     GT| high|
|103983|    Personal|9999.98|     CR| high|
|259119|    Personal|9999.98|     UG| high|
|317820|Professional|9999.98|     EE| high|
|303197|    Personal|9999.98|     UA| high|
| 58619|    Personal|9999.97|     SA| high|
|189443|    Business|9999.97|     CN| high|
|446171|    Personal|9999.97|     GB| high|
| 99581|    Business|9999.97|     KE| high|
|342602|    Personal|9999.97|     PE| high|
|471114|    Personal|9999.96|     GR| high|
|199378|Professional|9999.96|     PT| high|
|360159|    Personal|9999.96|     CI| high|
|203948|    Personal|9999.95|     TT| high|
|414850|    Business|9999.95|     ET| high|
|473894|    Personal|9999.95|     ZA| high|
|164490|    Business|9999.93|     LT| high|
|446055|    Business|9999.93|     SG| high|
|202496|    Business|9999.93|   

2.	Write a function that reads a text file in the format presented below, and then ensure that it returns a data frame of table content with proper columns.

In [7]:
from pyspark.sql.functions import col,lit, regexp_replace
import pandas as pd
from pyspark.sql import SparkSession


# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read CSV Example") \
    .getOrCreate()


def get_format_file():
    format_file = spark.read.text("format_file.txt")
    firts_format = format_file.withColumn("firts_format",regexp_replace(format_file["value"],"-","")) \
    
    file_formated = firts_format \
          .withColumn("final_format",regexp_replace(firts_format["firts_format"],"\+","")) \
          .filter(col("final_format") != lit(""))
    
    file_header = file_formated.first()
    df_pandas = file_formated \
        .filter(file_formated["final_format"] != file_header["final_format"]) \
        .select("final_format").toPandas()

    df = df_pandas['final_format'].str.split('|', expand=True)
    df = df.drop(df.columns[0], axis=1)
    df = df.drop(df.columns[-1], axis=1)
    final_df = spark.createDataFrame(df)
    final_df.show()

get_format_file()

+---+------------------+-----+
|  1|                 2|    3|
+---+------------------+-----+
|  1|     one,two,three|  one|
|  2|     four,one,five|  six|
|  3|seven,nine,one,two|eight|
|  4|    two,three,five| five|
|  5|      six,five,one|seven|
+---+------------------+-----+

