### Pip install commands

In [1]:
!pip install pyspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz

!pip install cryptography==38.0.3
!pip install findspark

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pip/_vendor/pkg_resources/__init__.py", line 3108, in _dep_map
    return self.__dep_map
  File "/usr/local/lib/python3.10/dist-packages/pip/_vendor/pkg_resources/__init__.py", line 2901, in __getattr__
    raise AttributeError(attr)
AttributeError: _DistInfoDistribution__dep_map

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/cli/base_command.py", line 169, in exc_logging_wrapper
    status = run_func(*args)
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/cli/req_command.py", line 242, in wrapper
    return func(self, options, args)
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/commands/install.py", line 441, in run
    conflicts = self._determine_conflicts(to_install)
  File "/usr/local/lib/python3.10/dist-packages/pip/_internal/commands/install.py", line 

### Init Environment

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("example").getOrCreate()

### Load Parquet File

In [17]:
payments = spark.read.parquet("payments-input.snappy.parquet")

In [18]:
payments.schema

StructType(List(StructField(date,TimestampType,true),StructField(currency,StringType,true),StructField(amount,DoubleType,true),StructField(encoded_user_id,BinaryType,true)))

In [19]:
payments.show()

+-------------------+--------+------+--------------------+
|               date|currency|amount|     encoded_user_id|
+-------------------+--------+------+--------------------+
|2022-10-16 15:00:00|     ILS|1000.0|[67 41 41 41 41 4...|
|2022-10-16 15:00:00|     ILS|  13.5|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     EUR| 120.0|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     USD|  10.0|[67 41 41 41 41 4...|
|2022-10-22 15:00:00|     EUR|   9.0|[67 41 41 41 41 4...|
|2022-10-09 15:00:00|     USD|   1.0|[67 41 41 41 41 4...|
|2022-10-22 15:00:00|     EUR| 244.0|[67 41 41 41 41 4...|
|2022-10-09 15:00:00|     USD| 200.0|[67 41 41 41 41 4...|
|2022-10-16 15:00:00|     USD| 378.0|[67 41 41 41 41 4...|
|2022-10-04 15:00:00|     ILS|  11.0|[67 41 41 41 41 4...|
+-------------------+--------+------+--------------------+



## 1. Show Currencies Sum
Show the total sum amount for each currency.

In [20]:
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import col, sum

payments = payments.withColumn('amount', col('amount').cast(DecimalType(10, 2)))

total_amount_per_currency = payments\
  .groupBy('currency')\
  .agg(sum('amount'))\
  .alias('total_amount')

total_amount_per_currency.show()

+--------+-----------+
|currency|sum(amount)|
+--------+-----------+
|     EUR|     373.00|
|     ILS|    1024.50|
|     USD|     589.00|
+--------+-----------+



## 2. Exchange rates for every currency to ILS
Create a DataFrame of exchange rates for every currency to ILS, and use it to add a new column to the payments table, named amount_in_ils.


In [21]:
exchange_rates = spark.createDataFrame([("ILS", 1.0), ("USD", 3.2), ("EUR", 3.8)], ["currency", "exchange_rate"])

payments_with_ils = payments\
  .join(exchange_rates, 'currency')\
  .withColumn('amount_in_ils', col('amount') * col('exchange_rate'))\
  .drop("exchange_rate")

payments_with_ils.show()

+--------+-------------------+-------+--------------------+------------------+
|currency|               date| amount|     encoded_user_id|     amount_in_ils|
+--------+-------------------+-------+--------------------+------------------+
|     ILS|2022-10-04 15:00:00|  11.00|[67 41 41 41 41 4...|              11.0|
|     ILS|2022-10-16 15:00:00|  13.50|[67 41 41 41 41 4...|              13.5|
|     ILS|2022-10-16 15:00:00|1000.00|[67 41 41 41 41 4...|            1000.0|
|     USD|2022-10-16 15:00:00| 378.00|[67 41 41 41 41 4...|1209.6000000000001|
|     USD|2022-10-09 15:00:00| 200.00|[67 41 41 41 41 4...|             640.0|
|     USD|2022-10-09 15:00:00|   1.00|[67 41 41 41 41 4...|               3.2|
|     USD|2022-10-04 15:00:00|  10.00|[67 41 41 41 41 4...|              32.0|
|     EUR|2022-10-22 15:00:00| 244.00|[67 41 41 41 41 4...| 927.1999999999999|
|     EUR|2022-10-22 15:00:00|   9.00|[67 41 41 41 41 4...|34.199999999999996|
|     EUR|2022-10-04 15:00:00| 120.00|[67 41 41 41 4

## 3. Decrypt `encoded_user_id` column
Use the function below to decrypt the values in the encoded_user_id column.


In [22]:
from cryptography.fernet import Fernet
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def decode(encoded_used_id: bytes) -> str:
    key = b'CuTaE-KQM5MOZkExifXvfssUzXxU4TtNQyiggxCh8G8='
    used_id: str = Fernet(key).decrypt(encoded_used_id.decode()).decode()
    return used_id

decode_udf = udf(decode, StringType())
payments_decoded = payments\
  .withColumn("decoded_user_id", decode_udf("encoded_user_id"))\
  .drop("encoded_user_id")

payments_decoded.show()

+-------------------+--------+-------+----------------+
|               date|currency| amount| decoded_user_id|
+-------------------+--------+-------+----------------+
|2022-10-16 15:00:00|     ILS|1000.00| Osnat Haj Yahia|
|2022-10-16 15:00:00|     ILS|  13.50|Michael Livshits|
|2022-10-04 15:00:00|     EUR| 120.00|      Uri Shohet|
|2022-10-04 15:00:00|     USD|  10.00|       Dana Assa|
|2022-10-22 15:00:00|     EUR|   9.00|       Dana Assa|
|2022-10-09 15:00:00|     USD|   1.00|         Tom Mor|
|2022-10-22 15:00:00|     EUR| 244.00|    Hersh Shefer|
|2022-10-09 15:00:00|     USD| 200.00|       Sagi Vegh|
|2022-10-16 15:00:00|     USD| 378.00|Dmitry Burshtein|
|2022-10-04 15:00:00|     ILS|  11.00|     Itay Granik|
+-------------------+--------+-------+----------------+

