<a href="https://colab.research.google.com/github/Dev-Parmar17/ADF/blob/main/PYSPARK_PRACTICE.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [4]:
spark = SparkSession.builder.appName("SparkByExamples.com").getOrCreate()



In [3]:
df = spark.read.format('json')\
    .option("inferSchema", "true")\
    .option("header", "true")\
    .load('transactions.json')

NameError: name 'spark' is not defined

In [None]:
df.show()

+---------------+------+--------+-------+------+
|_corrupt_record|amount|customer| status|txn_id|
+---------------+------+--------+-------+------+
|           NULL|   500|     101|success|     1|
|           NULL|   200|     101| failed|     2|
|           NULL|   800|     102|success|     3|
+---------------+------+--------+-------+------+



In [None]:
df.drop(col('_corrupt_record'))

DataFrame[amount: bigint, customer: bigint, status: string, txn_id: bigint]

In [None]:
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- amount: long (nullable = true)
 |-- customer: long (nullable = true)
 |-- status: string (nullable = true)
 |-- txn_id: long (nullable = true)



For each customer:

total amount

successful amount

failed count

Use groupBy + ag

In [None]:
grouped_df = df.groupBy("customer") \
              .agg(\
                  sum('amount').alias('total amount'),\
                  sum(when(col("status") == "success", col("amount")).otherwise(0)).alias('successful amount'),\
                  sum(when(col("status") == "failed", 1).otherwise(0)).alias('failed count')\
              )

In [None]:
grouped_df.show()

+--------+------------+-----------------+------------+
|customer|total amount|successful amount|failed count|
+--------+------------+-----------------+------------+
|     101|         700|              500|           1|
|     102|         800|              800|           0|
+--------+------------+-----------------+------------+



In [None]:
df.fillna(0, subset=['amount']).show()

+------+--------+-------+------+
|amount|customer| status|txn_id|
+------+--------+-------+------+
|   500|     101|success|     1|
|   200|     101| failed|     2|
|   800|     102|success|     3|
+------+--------+-------+------+



In [None]:
df = df.drop(col('_corrupt_record'))

In [None]:
grouped_df.show()

+--------+------------+-----------------+------------+
|customer|total amount|successful amount|failed count|
+--------+------------+-----------------+------------+
|     101|         700|              500|           1|
|     102|         800|              800|           0|
+--------+------------+-----------------+------------+



In [None]:
grouped_df.write.mode("overwrite").parquet("grouped_df.parquet")


In [None]:
spark.read.parquet("grouped_df.parquet").show()

+--------+------------+-----------------+------------+
|customer|total amount|successful amount|failed count|
+--------+------------+-----------------+------------+
|     101|         700|              500|           1|
|     102|         800|              800|           0|
+--------+------------+-----------------+------------+



In [None]:
df.createOrReplaceTempView("df_view")
spark.sql("select * from df_view").show()

+------+--------+-------+------+
|amount|customer| status|txn_id|
+------+--------+-------+------+
|   500|     101|success|     1|
|   200|     101| failed|     2|
|   800|     102|success|     3|
+------+--------+-------+------+



In [None]:
spark.sql('''select customer
          from df_view
          where status = 'failed'
          group by customer
          having count(*) > 3''').show()

+--------+
|customer|
+--------+
+--------+



In [None]:
spark.sql('''select max(amount) as second_high_salary
          from df_view
          where amount < (select max(amount)
          from df_view)''').show()

+------------------+
|second_high_salary|
+------------------+
|               500|
+------------------+



In [None]:
error_count = 0
with open('log.txt', 'r') as f:
    for line in f:
        if "ERROR" in line:
            error_count += 1
print(f"Number of lines containing 'ERROR': {error_count}")

Number of lines containing 'ERROR': 4


In [None]:
# Create a file with specific parameters
with open(
    'my_example_file.txt',
    mode='w+',
    encoding='utf-8',
    errors='replace',
    newline='',
    buffering=1
) as f:
    f.write('Hello, world!\n')
    f.write('This is another line.\n')
    f.write('Here are some special characters: \u03B1\u03B2\u03B3\n') # Greek letters

    # Seek to the beginning of the file to read
    f.seek(0)
    content = f.read()
    print("File content:\n" + content)

# Example of print() with 'sep' and 'end'
print("These", "are", "separated", "by", "-", sep='-', end='!\n')
print("This is on the same line because of the previous 'end' parameter.")

File content:
Hello, world!
This is another line.
Here are some special characters: αβγ

These-are-separated-by--!
This is on the same line because of the previous 'end' parameter.


In [None]:
import pandas as pd
import io

# Read the string data into a pandas DataFrame
df_from_string = pd.read_csv(io.StringIO(data_string))

# Save the DataFrame to a CSV file
df_from_string.to_csv('transactions_new.csv', index=False)

print("Data saved to 'transactions_new.csv'")

In [None]:
# Display the first few rows of the new DataFrame to confirm
display(df_from_string.head())

In [15]:
df = spark.read.format('csv')\
     .option("inferSchema", "true")\
     .option("header", "true")\
     .load('transactions_20rows.csv')

In [14]:
df.show()

+------+-----------+------+-------+------------+-------------------+
|txn_id|customer_id|amount| status|payment_mode|          timestamp|
+------+-----------+------+-------+------------+-------------------+
|     1|        101|   500|success|         UPI|2025-01-12 10:23:11|
|     2|        101|  1200| failed|        Card|2025-01-12 10:24:55|
|     3|        102|   800|success|         UPI|2025-01-12 10:28:01|
|     4|        103|  1500| failed|         UPI|2025-01-12 10:30:11|
|     5|        102|   300|success|      Wallet|2025-01-13 11:10:22|
|     6|        101|   500|success|         UPI|2025-01-13 11:12:01|
|     7|        104|  2500| failed|        Card|2025-01-13 12:44:19|
|     8|        105|   900|success|         UPI|2025-01-14 09:01:41|
|     9|        105|   900|success|         UPI|2025-01-14 09:01:41|
|    10|        103|   600|success|      Wallet|2025-01-14 09:22:01|
|    11|        104|  2500| failed|        Card|2025-01-14 09:23:55|
|    12|        106|   700|success

In [17]:
import re
string = """2025-01-12 10:23:11 INFO  /Account/Login:7983045718:Amul@12664  Login attempt
2025-01-12 10:24:55 ERROR Payment failed for user_id=101 amount=1200 reason=InsufficientBalance
2025-01-12 10:25:32 INFO  /user/profile/view?uid=2001 Request OK
2025-01-12 10:26:10 WARN  Suspicious URL detected: /Account/Login:9812345621:password123
2025-01-12 10:27:49 ERROR API timeout at /api/v2/upi/transfer txn_id=9982001
2025-01-12 10:28:01 INFO  Payment success user_id=102 amount=500 mode=UPI
2025-01-12 10:29:15 ERROR Fraud attempt detected IP=192.168.1.88 multiple_failed_logins=5
2025-01-12 10:30:44 INFO  /Account/Login:9876543210:admin@@12  Login attempt
2025-01-12 10:31:55 WARN  High latency at /api/v1/checkBalance user=109
2025-01-12 10:33:17 ERROR  /Account/Login:7689541230:root@123  Suspicious credential exposure
"""

regex = '\d+'
match = re.findall(regex, string)
print(match)




['2025', '01', '12', '10', '23', '11', '7983045718', '12664', '2025', '01', '12', '10', '24', '55', '101', '1200', '2025', '01', '12', '10', '25', '32', '2001', '2025', '01', '12', '10', '26', '10', '9812345621', '123', '2025', '01', '12', '10', '27', '49', '2', '9982001', '2025', '01', '12', '10', '28', '01', '102', '500', '2025', '01', '12', '10', '29', '15', '192', '168', '1', '88', '5', '2025', '01', '12', '10', '30', '44', '9876543210', '12', '2025', '01', '12', '10', '31', '55', '1', '109', '2025', '01', '12', '10', '33', '17', '7689541230', '123']


  regex = '\d+'


In [23]:
import re
string = """Your OTP for login is 897654. Do not share with anyone."""

regex = '\d+'
match = re.findall(regex, string)[0]
print(match)

897654


  regex = '\d+'


In [30]:
log_file = "/content/sample_data/ogfile_sample_10_lines.txt"

counts = {"INFO":0, "WARN":0, "ERROR":0, "DEBUG":0}

with open(log_file, "r") as f:
    for line in f:
        for level in counts:
            if level in line:
                counts[level] += 1

print(counts)


{'INFO': 4, 'WARN': 2, 'ERROR': 4, 'DEBUG': 0}


In [31]:
df.printSchema()

root
 |-- txn_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- amount: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- payment_mode: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [32]:
df.na.fill(value=0,subset=['txn_id','customer_id','amount'])

DataFrame[txn_id: int, customer_id: int, amount: int, status: string, payment_mode: string, timestamp: timestamp]

In [37]:
df.show(5)

+------+-----------+------+-------+------------+-------------------+
|txn_id|customer_id|amount| status|payment_mode|          timestamp|
+------+-----------+------+-------+------------+-------------------+
|     1|        101|   500|success|         UPI|2025-01-12 10:23:11|
|     2|        101|  1200| failed|        Card|2025-01-12 10:24:55|
|     3|        102|   800|success|         UPI|2025-01-12 10:28:01|
|     4|        103|  1500| failed|         UPI|2025-01-12 10:30:11|
|     5|        102|   300|success|      Wallet|2025-01-13 11:10:22|
+------+-----------+------+-------+------------+-------------------+
only showing top 5 rows


In [38]:
df = df.withColumn("is_high_value", when(col("amount") > 1000, True).otherwise(False))

In [39]:
df.show()

+------+-----------+------+-------+------------+-------------------+-------------+
|txn_id|customer_id|amount| status|payment_mode|          timestamp|is_high_value|
+------+-----------+------+-------+------------+-------------------+-------------+
|     1|        101|   500|success|         UPI|2025-01-12 10:23:11|        false|
|     2|        101|  1200| failed|        Card|2025-01-12 10:24:55|         true|
|     3|        102|   800|success|         UPI|2025-01-12 10:28:01|        false|
|     4|        103|  1500| failed|         UPI|2025-01-12 10:30:11|         true|
|     5|        102|   300|success|      Wallet|2025-01-13 11:10:22|        false|
|     6|        101|   500|success|         UPI|2025-01-13 11:12:01|        false|
|     7|        104|  2500| failed|        Card|2025-01-13 12:44:19|         true|
|     8|        105|   900|success|         UPI|2025-01-14 09:01:41|        false|
|     9|        105|   900|success|         UPI|2025-01-14 09:01:41|        false|
|   

. Use Window function to get previous transaction amount per user

In [56]:
WindowSpec = Window.partitionBy('customer_id').orderBy('timestamp', 'txn_id')

TypeError: 'function' object is not subscriptable

In [48]:
df = df.withColumn("previous_transaction_amount", lag(col("amount"), 1).over(WindowSpec))\
       .orderBy(col('txn_id').asc())

In [49]:
df.show()

+------+-----------+------+-------+------------+-------------------+-------------+---------------------------+
|txn_id|customer_id|amount| status|payment_mode|          timestamp|is_high_value|previous_transaction_amount|
+------+-----------+------+-------+------------+-------------------+-------------+---------------------------+
|     1|        101|   500|success|         UPI|2025-01-12 10:23:11|        false|                       NULL|
|     2|        101|  1200| failed|        Card|2025-01-12 10:24:55|         true|                        500|
|     3|        102|   800|success|         UPI|2025-01-12 10:28:01|        false|                       NULL|
|     4|        103|  1500| failed|         UPI|2025-01-12 10:30:11|         true|                       NULL|
|     5|        102|   300|success|      Wallet|2025-01-13 11:10:22|        false|                        800|
|     6|        101|   500|success|         UPI|2025-01-13 11:12:01|        false|                       1200|
|

In [50]:
df.persist()
df.unpersist()

DataFrame[txn_id: int, customer_id: int, amount: int, status: string, payment_mode: string, timestamp: timestamp, is_high_value: boolean, previous_transaction_amount: int]

In [51]:
df.cache()

DataFrame[txn_id: int, customer_id: int, amount: int, status: string, payment_mode: string, timestamp: timestamp, is_high_value: boolean, previous_transaction_amount: int]

In [52]:
df.show()

+------+-----------+------+-------+------------+-------------------+-------------+---------------------------+
|txn_id|customer_id|amount| status|payment_mode|          timestamp|is_high_value|previous_transaction_amount|
+------+-----------+------+-------+------------+-------------------+-------------+---------------------------+
|     1|        101|   500|success|         UPI|2025-01-12 10:23:11|        false|                       NULL|
|     2|        101|  1200| failed|        Card|2025-01-12 10:24:55|         true|                        500|
|     3|        102|   800|success|         UPI|2025-01-12 10:28:01|        false|                       NULL|
|     4|        103|  1500| failed|         UPI|2025-01-12 10:30:11|         true|                       NULL|
|     5|        102|   300|success|      Wallet|2025-01-13 11:10:22|        false|                        800|
|     6|        101|   500|success|         UPI|2025-01-13 11:12:01|        false|                       1200|
|

In [55]:
import pyspark.sql.functions as F

# The 'explode' function requires an Array or Map type column.
# The 'payment_mode' column is currently a String type, which is why the error occurred.
# If you intend to split the string into an array, you would first need to use a function like split() to convert it.
# For example: df_exploded = df.withColumn("tag", F.explode(F.split(F.col("payment_mode"), ",")))
# However, based on the current data, 'payment_mode' seems to hold single values, not delimited strings.
# Therefore, the explode function is not appropriate for this column in its current form.

# The line causing the error has been commented out:
# df_exploded = df.withColumn("tag", F.explode("payment_mode"))
# df_exploded.show()
