In [33]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Optimized Spark Session") \
    .master("local[4]") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "1g") \
    .config("spark.sql.shuffle.partitions", "50") \
    .getOrCreate()

print('Active Spark Sessions',spark.sparkContext.uiWebUrl)

Active Spark Sessions http://10.255.255.254:4040


In [34]:
cs = spark.sparkContext
print(f"Spark version: {spark.version}")

Spark version: 4.0.0


In [24]:
file = 'MoldovaGaz-1-2020.csv'

In [31]:
df = spark.read.csv(file, sep=',').show(5)

+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|             _c0|                _c1|          _c2|           _c3|                _c4|             _c5|            _c6|                 _c7|              _c8|               _c9|                _c10|         _c11|_c12|       _c13|_c14|      _c15|        _c16|           _c17|                _c18|           _c19|
+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|  invoice_number|       invoice_date|invoice_owner|invoice_a

In [32]:
spark.read.csv(file, sep=',', header=True).show(5)

+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|  invoice_number|       invoice_date|invoice_owner|invoice_amount|   invoice_due_date|payer_first_name|payer_last_name|payer_address_street|payer_address_zip|payer_address_city|payer_address_county|      service|debt|consumption|unit|unit_price|compensation|monthly_payment|       total_ammount|contract_number|
+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|932/093257673515|2020-01-31 00:00:00|   MoldovaGaz|      500

In [27]:
spark.read.csv(file, sep=',').show(5)

+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|             _c0|                _c1|          _c2|           _c3|                _c4|             _c5|            _c6|                 _c7|              _c8|               _c9|                _c10|         _c11|_c12|       _c13|_c14|      _c15|        _c16|           _c17|                _c18|           _c19|
+----------------+-------------------+-------------+--------------+-------------------+----------------+---------------+--------------------+-----------------+------------------+--------------------+-------------+----+-----------+----+----------+------------+---------------+--------------------+---------------+
|  invoice_number|       invoice_date|invoice_owner|invoice_a

In [36]:
df = spark.read.csv(file, sep=',', header=True)

In [37]:
df.printSchema()

root
 |-- invoice_number: string (nullable = true)
 |-- invoice_date: string (nullable = true)
 |-- invoice_owner: string (nullable = true)
 |-- invoice_amount: string (nullable = true)
 |-- invoice_due_date: string (nullable = true)
 |-- payer_first_name: string (nullable = true)
 |-- payer_last_name: string (nullable = true)
 |-- payer_address_street: string (nullable = true)
 |-- payer_address_zip: string (nullable = true)
 |-- payer_address_city: string (nullable = true)
 |-- payer_address_county: string (nullable = true)
 |-- service: string (nullable = true)
 |-- debt: string (nullable = true)
 |-- consumption: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- compensation: string (nullable = true)
 |-- monthly_payment: string (nullable = true)
 |-- total_ammount: string (nullable = true)
 |-- contract_number: string (nullable = true)



In [44]:
#Recreate the DataFrame From RDD (Deepest Clone)
df2 = spark.createDataFrame(df.rdd, df.schema)
#Transform column to date
df2 = df2.withColumn('invoice_date', F.col('invoice_date').cast('date'))
df2.printSchema()

root
 |-- invoice_number: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- invoice_owner: string (nullable = true)
 |-- invoice_amount: string (nullable = true)
 |-- invoice_due_date: string (nullable = true)
 |-- payer_first_name: string (nullable = true)
 |-- payer_last_name: string (nullable = true)
 |-- payer_address_street: string (nullable = true)
 |-- payer_address_zip: string (nullable = true)
 |-- payer_address_city: string (nullable = true)
 |-- payer_address_county: string (nullable = true)
 |-- service: string (nullable = true)
 |-- debt: string (nullable = true)
 |-- consumption: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- compensation: string (nullable = true)
 |-- monthly_payment: string (nullable = true)
 |-- total_ammount: string (nullable = true)
 |-- contract_number: string (nullable = true)



In [46]:
df2 = df2\
    .withColumn('invoice_amount', F.col('invoice_amount').cast('float'))\
    .withColumn('invoice_due_date', F.col('invoice_due_date').cast('date'))\
    .withColumn('debt', F.col('debt').cast('float'))
df2.printSchema()

root
 |-- invoice_number: string (nullable = true)
 |-- invoice_date: date (nullable = true)
 |-- invoice_owner: string (nullable = true)
 |-- invoice_amount: float (nullable = true)
 |-- invoice_due_date: date (nullable = true)
 |-- payer_first_name: string (nullable = true)
 |-- payer_last_name: string (nullable = true)
 |-- payer_address_street: string (nullable = true)
 |-- payer_address_zip: string (nullable = true)
 |-- payer_address_city: string (nullable = true)
 |-- payer_address_county: string (nullable = true)
 |-- service: string (nullable = true)
 |-- debt: float (nullable = true)
 |-- consumption: string (nullable = true)
 |-- unit: string (nullable = true)
 |-- unit_price: string (nullable = true)
 |-- compensation: string (nullable = true)
 |-- monthly_payment: string (nullable = true)
 |-- total_ammount: string (nullable = true)
 |-- contract_number: string (nullable = true)



Select

In [52]:
df.select('payer_last_name','payer_address_street','payer_address_city').distinct().show(10, truncate=False)

+---------------+---------------------------------------------------------+------------------+
|payer_last_name|payer_address_street                                     |payer_address_city|
+---------------+---------------------------------------------------------+------------------+
|Ene            |Bulevardul Mocanu Nr. 42 Bl. 19  Sc. 47 Ap. 758          |Valea lui Mihai   |
|Tudor          |Soseaua Barbu                                            |Borcea            |
|Florea         |Strada Dochioiu                                          |Strehaia          |
|Nistor         |Intrarea Andrei Ioniță Nr. 35                            |Prejmer           |
|Dinu           |Aleea Albu                                               |Barbulesti        |
|Ene            |Strada Floare Dumitrescu                                 |Bacau             |
|Tabacu         |Drumul Suciu Nr. 56                                      |Breaza de Sus     |
|Ionescu        |Drumul Voinea                    

In [54]:
df.select('payer_last_name','payer_address_street','payer_address_city').distinct().where(F.col('total_ammount')>='300'.show(10, truncate=False)

+---------------+--------------------+------------------+
|payer_last_name|payer_address_street|payer_address_city|
+---------------+--------------------+------------------+
+---------------+--------------------+------------------+



GroupBy

In [48]:
df\
    .groupBy('payer_address_street')\
    .agg(F.count('*').alias('total_rows'))\
    .orderBy(F.col('total_rows').desc())\
    .show()



+--------------------+----------+
|payer_address_street|total_rows|
+--------------------+----------+
|       Strada Nistor|       269|
|   Bulevardul Nistor|       261|
|       Drumul Nistor|       249|
|        Aleea Nistor|       249|
|      Soseaua Nistor|       245|
|     Intrarea Nistor|       224|
|      Intrarea Marin|       160|
|         Drumul Dima|       158|
|      Bulevardul Ene|       152|
|        Aleea Ioniță|       149|
|     Bulevardul Stan|       149|
|         Aleea Dobre|       148|
|          Aleea Niță|       148|
|       Drumul Ababei|       147|
|Bulevardul Gheorghiu|       147|
|     Aleea Georgescu|       146|
|          Aleea Popa|       145|
|   Bulevardul Stancu|       145|
|       Soseaua Tudor|       145|
|       Drumul Manole|       143|
+--------------------+----------+
only showing top 20 rows


                                                                                

Filter

In [None]:
df_filtered = df\
            .where(F.col('payer_address_city') == 'Barbulesti')\
            .where(F.col('value').isNotNull())