In [1]:
import datetime

In [16]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StructField,IntegerType, StringType, TimestampType
from pyspark.sql.functions import from_unixtime, col, to_date, date_format, unix_timestamp, lit,regexp_replace,split, udf, spark_partition_id

In [3]:
spark = SparkSession.builder.master("local[*]").appName('test').getOrCreate()

25/06/21 11:35:26 WARN Utils: Your hostname, hari-HP-Laptop-15s-du2xxx resolves to a loopback address: 127.0.1.1; using 172.17.0.1 instead (on interface docker0)
25/06/21 11:35:26 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/06/21 11:35:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark

In [5]:
# Create DataFrames for Employees and Departments
data_employees = [(1, "John", 1), (2, "Emma", 2), (3, "Raj", None), (4, "Nina", 4)]
data_departments = [(1, "HR"), (2, "Tech"), (3, "Marketing"), (None, "Temp"),(1, "HR")]

columns_employees = ["emp_id", "emp_name", "dept_id"]
columns_departments = ["dept_id", "dept_name"]

df_employees = spark.createDataFrame(data_employees, columns_employees)
df_departments = spark.createDataFrame(data_departments, columns_departments)

# Perform INNER JOIN
# since `inner` is the default join type, we can omit it
#df_joined = df_employees.join(df_departments, df_employees.dept_id == df_departments.dept_id)

# Show the result
#df_joined.show()

df_employees.show()
df_departments.show()

                                                                                

+------+--------+-------+
|emp_id|emp_name|dept_id|
+------+--------+-------+
|     1|    John|      1|
|     2|    Emma|      2|
|     3|     Raj|   null|
|     4|    Nina|      4|
+------+--------+-------+

+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|       HR|
|      2|     Tech|
|      3|Marketing|
|   null|     Temp|
|      1|       HR|
+-------+---------+



In [10]:
df_repart = df_employees.coalesce(1)




In [25]:
df_repart_cached = df_repart.cache()

25/06/21 11:51:23 WARN CacheManager: Asked to cache already cached data.


In [11]:
df_employees.rdd.getNumPartitions()
df_repart.rdd.getNumPartitions()

1

In [20]:
(df_employees.groupBy(col('dept_id')).count()).show()

+-------+-----+
|dept_id|count|
+-------+-----+
|   null|    1|
|      1|    1|
|      2|    1|
|      4|    1|
+-------+-----+



In [19]:
(df_repart.groupBy(col('dept_id')).count()).show()

+-------+-----+
|dept_id|count|
+-------+-----+
|      1|    1|
|      2|    1|
|      4|    1|
|   null|    1|
+-------+-----+



                                                                                

In [26]:
(df_repart_cached.withColumn('part_id',spark_partition_id())).show()

+------+--------+-------+-------+
|emp_id|emp_name|dept_id|part_id|
+------+--------+-------+-------+
|     1|    John|      1|      0|
|     2|    Emma|      2|      0|
|     3|     Raj|   null|      0|
|     4|    Nina|      4|      0|
+------+--------+-------+-------+



In [27]:
(df_employees.withColumn('part_id',spark_partition_id())).show()

+------+--------+-------+-------+
|emp_id|emp_name|dept_id|part_id|
+------+--------+-------+-------+
|     1|    John|      1|      1|
|     2|    Emma|      2|      3|
|     3|     Raj|   null|      5|
|     4|    Nina|      4|      7|
+------+--------+-------+-------+



In [39]:


df = df_employees.join(df_departments, on= df_employees['dept_id'] == df_departments['dept_id'], how="outer")
#df = df_employees.crossJoin(df_departments)
df.explain(True)

df.rdd.getNumPartitions()

df_part = df.coalesce(10)
df_part.count()

df_part.rdd.getNumPartitions()

== Parsed Logical Plan ==
Join FullOuter, (dept_id#336L = dept_id#340L)
:- LogicalRDD [emp_id#334L, emp_name#335, dept_id#336L], false
+- LogicalRDD [dept_id#340L, dept_name#341], false

== Analyzed Logical Plan ==
emp_id: bigint, emp_name: string, dept_id: bigint, dept_id: bigint, dept_name: string
Join FullOuter, (dept_id#336L = dept_id#340L)
:- LogicalRDD [emp_id#334L, emp_name#335, dept_id#336L], false
+- LogicalRDD [dept_id#340L, dept_name#341], false

== Optimized Logical Plan ==
Join FullOuter, (dept_id#336L = dept_id#340L)
:- LogicalRDD [emp_id#334L, emp_name#335, dept_id#336L], false
+- LogicalRDD [dept_id#340L, dept_name#341], false

== Physical Plan ==
SortMergeJoin [dept_id#336L], [dept_id#340L], FullOuter
:- *(2) Sort [dept_id#336L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(dept_id#336L, 200), true, [id=#1775]
:     +- *(1) Scan ExistingRDD[emp_id#334L,emp_name#335,dept_id#336L]
+- *(4) Sort [dept_id#340L ASC NULLS FIRST], false, 0
   +- Exchange hashpartit

10

In [33]:
df_part.rdd.getNumPartitions()

5

In [40]:
from pyspark.sql.functions import broadcast,spark_partition_id

df = df_employees.join(broadcast(df_departments), on= df_employees['dept_id'] == df_departments['dept_id'], how="left")
#df = df_employees.crossJoin(df_departments)
df_part.withColumn('part_id',spark_partition_id()).show()

+------+--------+-------+-------+---------+-------+
|emp_id|emp_name|dept_id|dept_id|dept_name|part_id|
+------+--------+-------+-------+---------+-------+
|     3|     Raj|   null|   null|     null|      2|
|  null|    null|   null|   null|     Temp|      2|
|     1|    John|      1|      1|       HR|      3|
|     1|    John|      1|      1|       HR|      3|
|  null|    null|   null|      3|Marketing|      5|
|     2|    Emma|      2|      2|     Tech|      6|
|     4|    Nina|      4|   null|     null|      7|
+------+--------+-------+-------+---------+-------+



In [41]:
data = ["hari prasath", "hari prasath weds ellakiya", "hari prasath works in infosys"]

In [48]:
rdd = spark.sparkContext.parallelize(data)

In [55]:
rdd_1 = (
        rdd.flatMap(lambda x:x.split())
        .map(lambda x:(x,1))
        .reduceByKey(lambda x,y : x + y)
)


# Whow this works 
'''
Rdd reads line by line
1) flatMap() -> we use flatMap to flatten the string ==> hari prasth => [hari,prasath]
2) map() -> maps every list with a value ==> [hari, prasath] => [(hari,1),(prasath,1]
3) reduceByKey() -> use to group the results and add the keys ==> [


'''

In [56]:
rdd_1.collect()

                                                                                

[('infosys', 1),
 ('hari', 3),
 ('in', 1),
 ('ellakiya', 1),
 ('works', 1),
 ('prasath', 3),
 ('weds', 1)]

In [57]:
rdd_2 = (
        rdd.flatMap(lambda x:x.split())
        .map(lambda x:(x,1))
)


AttributeError: 'function' object has no attribute '_get_object_id'

In [6]:
schema = StructType([
        StructField('userId',IntegerType(), False),
        StructField('movieId',IntegerType(), False),
        StructField('tag',StringType(),False),
        StructField('timestamp',IntegerType(),False)
])

In [7]:
data = spark.read.schema(schema).option("header",True).format("csv").load("data/tags.csv")

In [8]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)



In [9]:
data_timecnv = (
        data.withColumn('timestamp',from_unixtime(col('timestamp'),'yyyy-MM-dd HH:MM:SS.SS' ))
)

In [10]:
data_timecnv.show(truncate=False)

                                                                                

+------+-------+-----------------+----------------------+
|userId|movieId|tag              |timestamp             |
+------+-------+-----------------+----------------------+
|18    |4141   |Mark Waters      |2009-04-24 23:04:00.00|
|65    |208    |dark hero        |2013-05-10 07:05:00.00|
|65    |353    |dark hero        |2013-05-10 07:05:00.00|
|65    |521    |noir thriller    |2013-05-10 07:05:00.00|
|65    |592    |dark hero        |2013-05-10 07:05:00.00|
|65    |668    |bollywood        |2013-05-10 07:05:00.00|
|65    |898    |screwball comedy |2013-05-10 07:05:00.00|
|65    |1248   |noir thriller    |2013-05-10 07:05:00.00|
|65    |1391   |mars             |2013-05-10 07:05:00.00|
|65    |1617   |neo-noir         |2013-05-10 07:05:00.00|
|65    |1694   |jesus            |2013-05-10 07:05:00.00|
|65    |1783   |noir thriller    |2013-05-10 07:05:00.00|
|65    |2022   |jesus            |2013-05-10 07:05:00.00|
|65    |2193   |dragon           |2013-05-10 07:05:00.00|
|65    |2353  

In [11]:
def lower_tags(column):
    return column.lower()

In [17]:
# spark.udf.register('udf_name', function, return type)

In [14]:
udf_lower = udf(lower_tags,StringType())

In [15]:
df_udf = data_timecnv.withColumn('lower_tags', udf_lower(col('tag')))

In [16]:
df_udf.show()

+------+-------+-----------------+--------------------+-----------------+
|userId|movieId|              tag|           timestamp|       lower_tags|
+------+-------+-----------------+--------------------+-----------------+
|    18|   4141|      Mark Waters|2009-04-24 23:04:...|      mark waters|
|    65|    208|        dark hero|2013-05-10 07:05:...|        dark hero|
|    65|    353|        dark hero|2013-05-10 07:05:...|        dark hero|
|    65|    521|    noir thriller|2013-05-10 07:05:...|    noir thriller|
|    65|    592|        dark hero|2013-05-10 07:05:...|        dark hero|
|    65|    668|        bollywood|2013-05-10 07:05:...|        bollywood|
|    65|    898| screwball comedy|2013-05-10 07:05:...| screwball comedy|
|    65|   1248|    noir thriller|2013-05-10 07:05:...|    noir thriller|
|    65|   1391|             mars|2013-05-10 07:05:...|             mars|
|    65|   1617|         neo-noir|2013-05-10 07:05:...|         neo-noir|
|    65|   1694|            jesus|2013

Traceback (most recent call last):                                  (0 + 1) / 1]
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError
                                                                                

In [31]:
spark.catalog.listFunctions()

Function(name='!', description=None, className='org.apache.spark.sql.catalyst.expressions.Not', isTemporary=True)

In [27]:
for udf in udfs:
    print(udf)

In [92]:
data = data.withColumn("extra_spaces",lit("this      is    a  column"))

In [114]:
data_2 = data.withColumn("replaced_extra_spaces",regexp_replace(col('extra_spaces'),r"\s+"," ")).withColumn('convt_str',split(col('replaced_extra_spaces'),' '))

In [119]:
data.count()

465564

In [116]:
from pyspark.sql.functions import explode

In [120]:
data_2.withColumn('ex_arr',explode(col('convt_str'))).count()

1862256

In [21]:
(
    data.write.mode('overwrite')
        .parquet(f'output_data/tags_{datetime.date.today()}')
)

25/06/14 10:55:57 WARN CSVHeaderChecker: Number of column in CSV header is not equal to number of fields in the schema:
 Header length: 4, schema size: 3
CSV file: file:///home/hari/python-notebooks/data/tags.csv
                                                                                

In [33]:
import pyspark.sql.functions as F

In [34]:
df = (
    spark.read.option("header",True).csv("data/upi_transactions_2024.csv")
)

In [35]:
df.printSchema()

root
 |-- transaction id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- transaction type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- amount (INR): string (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- sender_age_group: string (nullable = true)
 |-- receiver_age_group: string (nullable = true)
 |-- sender_state: string (nullable = true)
 |-- sender_bank: string (nullable = true)
 |-- receiver_bank: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- network_type: string (nullable = true)
 |-- fraud_flag: string (nullable = true)
 |-- hour_of_day: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- is_weekend: string (nullable = true)



In [36]:
import re

In [37]:
def sanatize_column_name(column : str, replace_str : str) -> DataFrame:

    def sanitize(column):
        sanitize_df = re.sub(r'\W+',replace_str,column)
        sanitize_df = sanitize_df.strip(replace_str)
        return sanitize_df

    return sanitize(column).lower()    
        
        

In [38]:
# Column Renaming 
import re
columns = [sanatize_column_name(column,'_') for column in df.columns]

print(columns)
df_renamed = df.toDF(*columns)

['transaction_id', 'timestamp', 'transaction_type', 'merchant_category', 'amount_inr', 'transaction_status', 'sender_age_group', 'receiver_age_group', 'sender_state', 'sender_bank', 'receiver_bank', 'device_type', 'network_type', 'fraud_flag', 'hour_of_day', 'day_of_week', 'is_weekend']


In [39]:
df_renamed.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- transaction_type: string (nullable = true)
 |-- merchant_category: string (nullable = true)
 |-- amount_inr: string (nullable = true)
 |-- transaction_status: string (nullable = true)
 |-- sender_age_group: string (nullable = true)
 |-- receiver_age_group: string (nullable = true)
 |-- sender_state: string (nullable = true)
 |-- sender_bank: string (nullable = true)
 |-- receiver_bank: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- network_type: string (nullable = true)
 |-- fraud_flag: string (nullable = true)
 |-- hour_of_day: string (nullable = true)
 |-- day_of_week: string (nullable = true)
 |-- is_weekend: string (nullable = true)



In [214]:
df_renamed.select([
    F.count(F.when((F.col(c).isNull()) | (F.col(c) == '' )| (F.isnan(c)),c)).alias(f'{c}_nulls')
    for c in df_renamed.columns
]).show()



+--------------------+---------------+----------------------+-----------------------+----------------+------------------------+----------------------+------------------------+------------------+-----------------+-------------------+-----------------+------------------+----------------+-----------------+-----------------+----------------+
|transaction_id_nulls|timestamp_nulls|transaction_type_nulls|merchant_category_nulls|amount_inr_nulls|transaction_status_nulls|sender_age_group_nulls|receiver_age_group_nulls|sender_state_nulls|sender_bank_nulls|receiver_bank_nulls|device_type_nulls|network_type_nulls|fraud_flag_nulls|hour_of_day_nulls|day_of_week_nulls|is_weekend_nulls|
+--------------------+---------------+----------------------+-----------------------+----------------+------------------------+----------------------+------------------------+------------------+-----------------+-------------------+-----------------+------------------+----------------+-----------------+----------------

                                                                                

In [216]:
df_renamed.groupBy('transaction_status').agg(F.count('*')).show()

+------------------+--------+
|transaction_status|count(1)|
+------------------+--------+
|           SUCCESS|  237554|
|            FAILED|   12446|
+------------------+--------+



In [219]:
df_renamed.filter(F.col('transaction_status') == 'FAILED').show(2)

+--------------+-------------------+----------------+-----------------+----------+------------------+----------------+------------------+------------+-----------+-------------+-----------+------------+----------+-----------+-----------+----------+
|transaction_id|          timestamp|transaction_type|merchant_category|amount_inr|transaction_status|sender_age_group|receiver_age_group|sender_state|sender_bank|receiver_bank|device_type|network_type|fraud_flag|hour_of_day|day_of_week|is_weekend|
+--------------+-------------------+----------------+-----------------+----------+------------------+----------------+------------------+------------+-----------+-------------+-----------+------------+----------+-----------+-----------+----------+
| TXN0000000024|2024-04-27 20:17:46|             P2M|          Grocery|       264|            FAILED|           18-25|             36-45|       Delhi|      Kotak|     Yes Bank|    Android|          4G|         0|         20|   Saturday|         1|
| TXN000

In [228]:
df_renamed.select(F.col('receiver_age_group')).distinct().show()

+------------------+
|receiver_age_group|
+------------------+
|             18-25|
|             26-35|
|               56+|
|             46-55|
|             36-45|
+------------------+



In [241]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.window import Window


In [230]:
df_renamed = df_renamed.withColumn("salt", monotonically_increasing_id() % 10)

In [231]:
df_renamed.show()

+--------------+-------------------+----------------+-----------------+----------+------------------+----------------+------------------+--------------+-----------+-------------+-----------+------------+----------+-----------+-----------+----------+----+
|transaction_id|          timestamp|transaction_type|merchant_category|amount_inr|transaction_status|sender_age_group|receiver_age_group|  sender_state|sender_bank|receiver_bank|device_type|network_type|fraud_flag|hour_of_day|day_of_week|is_weekend|salt|
+--------------+-------------------+----------------+-----------------+----------+------------------+----------------+------------------+--------------+-----------+-------------+-----------+------------+----------+-----------+-----------+----------+----+
| TXN0000000001|2024-11-05 15:30:02|             P2P|    Entertainment|       534|           SUCCESS|           26-35|             26-35|     Rajasthan|   IndusInd|          SBI|    Android|          4G|         0|         15|    Tuesd

In [232]:
### data with duplicates

gpt_df = spark.read.option('header',True).csv('data/sample_100k_with_duplicates.csv')

In [238]:
gpt_dd = gpt_df.drop_duplicates()

In [243]:
wind_spec = Window.partitionBy(F.col('id')).orderBy(F.col('date').desc())

In [253]:
gpt_dd_wf = (
    gpt_df.withColumn('rn',F.row_number().over(wind_spec))
    .filter(F.col('rn') == 1)
    .orderBy(F.col('id'))
    .drop(F.col('rn'))
).show()

+-----+----------------+----------+
|   id|            name|      date|
+-----+----------------+----------+
|    1|  William Miller|2024-08-12|
|  100|     Terry Clark|2024-08-25|
| 1000| Timothy Johnson|2024-04-12|
|10000| William Ramirez|2023-09-13|
|10001|   Brenda Miller|2023-11-15|
|10002|  Brandon Medina|2024-04-12|
|10003| Valerie Johnson|2024-03-11|
|10004|  Derrick Duncan|2024-06-02|
|10005|     Henry Scott|2023-12-13|
|10006|   Steven Lawson|2024-08-03|
|10007| Jasmine Hawkins|2024-11-27|
|10008|Tracey Maldonado|2024-04-05|
|10009|   Jennifer Ball|2024-12-22|
|10010|Stephen Ferguson|2024-07-05|
|10014|Danielle Patrick|2024-01-31|
|10015|     Jason Evans|2024-12-20|
|10016|     Jean Fisher|2024-11-22|
|10017| Nicole Martinez|2024-03-08|
|10018|  Dr. Ryan Russo|2024-05-05|
|10019| Kristine Barton|2024-10-06|
+-----+----------------+----------+
only showing top 20 rows



In [42]:
df_win = df_renamed.select(F.col('transaction_id'), F.col('amount_inr')).limit(10)

In [43]:
df_win.show()

+--------------+----------+
|transaction_id|amount_inr|
+--------------+----------+
| TXN0000000001|       534|
| TXN0000000002|      1951|
| TXN0000000003|       388|
| TXN0000000004|      1495|
| TXN0000000005|      4333|
| TXN0000000006|       113|
| TXN0000000007|       132|
| TXN0000000008|       774|
| TXN0000000009|       209|
| TXN0000000010|       648|
+--------------+----------+



In [44]:
from pyspark.sql.window import Window

In [56]:
window_spec = Window.orderBy(F.col('transaction_id')).rowsBetween(Window.unboundedPreceding,Window.currentRow)
window_spec2 = Window.orderBy(F.col('transaction_id')).rowsBetween(start=-2,end=Window.currentRow)

In [50]:
df_win.withColumn('rolling_sum',F.sum(F.col('amount_inr')).over(window_spec)).show()

25/06/16 22:50:24 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+--------------+----------+-----------+
|transaction_id|amount_inr|rolling_sum|
+--------------+----------+-----------+
| TXN0000000001|       534|      534.0|
| TXN0000000002|      1951|     2485.0|
| TXN0000000003|       388|     2873.0|
| TXN0000000004|      1495|     4368.0|
| TXN0000000005|      4333|     8701.0|
| TXN0000000006|       113|     8814.0|
| TXN0000000007|       132|     8946.0|
| TXN0000000008|       774|     9720.0|
| TXN0000000009|       209|     9929.0|
| TXN0000000010|       648|    10577.0|
+--------------+----------+-----------+



In [57]:
df_win.withColumn('rolling_sum',F.sum(F.col('amount_inr')).over(window_spec2)).show()

+--------------+----------+-----------+
|transaction_id|amount_inr|rolling_sum|
+--------------+----------+-----------+
| TXN0000000001|       534|      534.0|
| TXN0000000002|      1951|     2485.0|
| TXN0000000003|       388|     2873.0|
| TXN0000000004|      1495|     3834.0|
| TXN0000000005|      4333|     6216.0|
| TXN0000000006|       113|     5941.0|
| TXN0000000007|       132|     4578.0|
| TXN0000000008|       774|     1019.0|
| TXN0000000009|       209|     1115.0|
| TXN0000000010|       648|     1631.0|
+--------------+----------+-----------+



25/06/16 22:53:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


In [74]:
#### Python

In [77]:
"hari".split()

['hari']

In [79]:
' '.join(['hari','prasath'])

'hari prasath'

In [83]:
import re

In [102]:
rg = "hari     Prasath is   a data   engineer             "

In [89]:
re.sub(r"\s+"," ",rg).strip()

'hari prasath is a data engineer'

In [100]:
rg.title()

'Hari     Prasath Is   A Data   Engineer             '

In [103]:
rg.swapcase()

'HARI     pRASATH IS   A DATA   ENGINEER             '

In [107]:
for i in range(1,5):
    print('hari'.rjust(i*5))

 hari
      hari
           hari
                hari
