# Creating Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

24/04/06 17:53:34 WARN Utils: Your hostname, Gauravs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.104 instead (on interface en0)
24/04/06 17:53:34 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/06 17:53:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Access Spark UI
spark

# Note
These questions are taken from <a href="https://www.linkedin.com/groups/14422071/">PySpark Coding Question (Linkedln Group)</a>

# Q1 :  
```
Find the trending  s
If a   appears more than once in a single comment, still the   occurrence will be considered as 1.

data = [
 ("Great day! 😊  #happy  #sunny  #workout",),
 ("Enjoying the weekend!  #fun  #fun  #funny  #friends",),
 ("Coding all day.  #work  #code  #productive  #code  #work",),
 ("Another day at the office.  #work  #busy",),
 (" #happy  #sunny  #workout  #happy",),
 ("Late-night coding session.  #code  #productive  #code  #tired",),
 ("Fun time with friends.  #fun  #friends  #happy  #fun",),
 (" #happy  #coding 😊  #productive  #happy",),
 (" #funny  #funny  #funny  #humor",),
 ("Working on a project.  #work  #project  #productive",),
]

Have a more optimized solution? Please comment.
Scope of improvement - avoid UDF , Handle corner case such as case sensitivity etc
```

In [3]:
from pyspark.sql.types import StringType,StructField,StructType

data = [
 ("Great day! 😊  #happy  #sunny  #workout",),
 ("Enjoying the weekend!  #fun  #fun  #funny  #friends",),
 ("Coding all day.  #work  #code  #productive  #code  #work",),
 ("Another day at the office.  #work  #busy",),
 (" #happy  #sunny  #workout  #happy",),
 ("Late-night coding session.  #code  #productive  #code  #tired",),
 ("Fun time with friends.  #fun  #friends  #happy  #fun",),
 (" #happy  #coding 😊  #productive  #happy",),
 (" #funny  #funny  #funny  #humor",),
 ("Working on a project.  #work  #project  #productive",),
]

schema = StructType([
    StructField('post',StringType(),True)
])

df = spark.createDataFrame(data,schema=schema)
df.show()

                                                                                

+--------------------+
|                post|
+--------------------+
|Great day! 😊  #h...|
|Enjoying the week...|
|Coding all day.  ...|
|Another day at th...|
| #happy  #sunny  ...|
|Late-night coding...|
|Fun time with fri...|
| #happy  #coding ...|
| #funny  #funny  ...|
|Working on a proj...|
+--------------------+



In [4]:
from pyspark.sql.functions import split,col
split_data = df.select(split(col('post'),' ').alias('split'))
split_data.show()

+--------------------+
|               split|
+--------------------+
|[Great, day!, 😊,...|
|[Enjoying, the, w...|
|[Coding, all, day...|
|[Another, day, at...|
|[, #happy, , #sun...|
|[Late-night, codi...|
|[Fun, time, with,...|
|[, #happy, , #cod...|
|[, #funny, , #fun...|
|[Working, on, a, ...|
+--------------------+



In [5]:
# you can also make use of array_distinct in spark dataframe if you dont want to use set in udf
def get_only_hashtags_unique(arr):
    li = set()
    for x in arr:
        if x.strip().startswith('#'):
            li.add(x.strip())
    return list(li)

In [6]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType

unique_only_hashtag = udf(get_only_hashtags_unique,ArrayType(StringType()))

unique_hashtags = split_data.select(unique_only_hashtag(col('split')).alias('only_hashtag'))
unique_hashtags.show()

                                                                                

+--------------------+
|        only_hashtag|
+--------------------+
|[#sunny, #workout...|
|[#fun, #funny, #f...|
|[#code, #work, #p...|
|      [#busy, #work]|
|[#sunny, #workout...|
|[#code, #producti...|
|[#fun, #friends, ...|
|[#coding, #produc...|
|    [#funny, #humor]|
|[#work, #project,...|
+--------------------+



In [7]:
from pyspark.sql.functions import explode

explode_data = unique_hashtags.select(explode(col('only_hashtag')).alias('explode'))
explode_data.show()

                                                                                

+-----------+
|    explode|
+-----------+
|     #sunny|
|   #workout|
|     #happy|
|       #fun|
|     #funny|
|   #friends|
|      #code|
|      #work|
|#productive|
|      #busy|
|      #work|
|     #sunny|
|   #workout|
|     #happy|
|      #code|
|#productive|
|     #tired|
|       #fun|
|   #friends|
|     #happy|
+-----------+
only showing top 20 rows



In [8]:
get_count = explode_data.groupBy(col('explode')).count()
get_count.show()

[Stage 12:>                                                         (0 + 8) / 8]

+-----------+-----+
|    explode|count|
+-----------+-----+
|     #happy|    4|
|     #sunny|    2|
|   #workout|    2|
|       #fun|    2|
|   #friends|    2|
|     #funny|    2|
|#productive|    4|
|      #code|    2|
|      #work|    3|
|      #busy|    1|
|     #tired|    1|
|    #coding|    1|
|   #project|    1|
|     #humor|    1|
+-----------+-----+



                                                                                

In [9]:
# from pyspark.sql.functions import sort
get_count.select('*').orderBy(col('count').desc()).show()

[Stage 15:>                                                         (0 + 8) / 8]

+-----------+-----+
|    explode|count|
+-----------+-----+
|     #happy|    4|
|#productive|    4|
|      #work|    3|
|     #sunny|    2|
|   #workout|    2|
|       #fun|    2|
|   #friends|    2|
|     #funny|    2|
|      #code|    2|
|      #busy|    1|
|     #tired|    1|
|    #coding|    1|
|   #project|    1|
|     #humor|    1|
+-----------+-----+



                                                                                

In [10]:
# Final Code
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

from pyspark.sql.types import StringType,StructField,StructType
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType,StringType
from pyspark.sql.functions import split,col
from pyspark.sql.functions import explode


# UDF
def get_only_hashtags_unique(arr):
    li = set()
    for x in arr:
        if x.strip().startswith('#'):
            li.add(x.strip())
    return list(li)
#Register UDF
unique_only_hashtag = udf(get_only_hashtags_unique,ArrayType(StringType()))


# Stage 1
data = [
 ("Great day! 😊  #happy  #sunny  #workout",),
 ("Enjoying the weekend!  #fun  #fun  #funny  #friends",),
 ("Coding all day.  #work  #code  #productive  #code  #work",),
 ("Another day at the office.  #work  #busy",),
 (" #happy  #sunny  #workout  #happy",),
 ("Late-night coding session.  #code  #productive  #code  #tired",),
 ("Fun time with friends.  #fun  #friends  #happy  #fun",),
 (" #happy  #coding 😊  #productive  #happy",),
 (" #funny  #funny  #funny  #humor",),
 ("Working on a project.  #work  #project  #productive",),
]

schema = StructType([
    StructField('post',StringType(),True)
])

df = spark.createDataFrame(data,schema=schema)
split_data = df.select(split(col('post'),' ').alias('split'))
unique_hashtags = split_data.select(unique_only_hashtag(col('split')).alias('only_hashtag'))
explode_data = unique_hashtags.select(explode(col('only_hashtag')).alias('explode'))


# Stage 2
get_count = explode_data.groupBy(col('explode')).count()
# from pyspark.sql.functions import sort
get_count.select('*').orderBy(col('count').desc()).show()

[Stage 18:>                                                         (0 + 8) / 8]

+-----------+-----+
|    explode|count|
+-----------+-----+
|     #happy|    4|
|#productive|    4|
|      #work|    3|
|     #sunny|    2|
|   #workout|    2|
|       #fun|    2|
|   #friends|    2|
|     #funny|    2|
|      #code|    2|
|      #busy|    1|
|     #tired|    1|
|    #coding|    1|
|   #project|    1|
|     #humor|    1|
+-----------+-----+



                                                                                

In [11]:
df.rdd.getNumPartitions()

8

In [12]:
get_count.rdd.getNumPartitions()

[Stage 21:>                                                         (0 + 8) / 8]

1

# Q2

##### Task - Write a Spark code snippet to group data on multiples of 100 and get max number from that group.

![q2.png](attachment:q2.png)

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType
from pyspark.sql.functions import *

# Create a SparkSession
spark = SparkSession.builder \
 .appName("CreateDataFrameExample") \
 .getOrCreate()
spark

24/04/06 17:53:56 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [14]:
# Define the schema for the DataFrame
schema = StructType([
 StructField("nums", IntegerType(), True),
])

# Sample numbers data
data = [(1,),(3,),(19,),(105,),(20,),(106,),(45678,),(305,),(399,)]

# Create a DataFrame from the sample data and schema
nums_df = spark.createDataFrame(data, schema)

# Show the DataFrame
nums_df.show()

                                                                                

+-----+
| nums|
+-----+
|    1|
|    3|
|   19|
|  105|
|   20|
|  106|
|45678|
|  305|
|  399|
+-----+



In [15]:
from pyspark.sql.functions import ceil
bucket = nums_df.select(col('*'),ceil((col('nums')/100)).alias('bucket'))
bucket.show()

+-----+------+
| nums|bucket|
+-----+------+
|    1|     1|
|    3|     1|
|   19|     1|
|  105|     2|
|   20|     1|
|  106|     2|
|45678|   457|
|  305|     4|
|  399|     4|
+-----+------+



In [16]:
from pyspark.sql.functions import collect_list
bucket.groupBy('bucket').agg(collect_list('nums').alias('bucket_data'),\
                            max('nums').alias('max_in_bucket'))\
                            .select('bucket_data','max_in_bucket').show()

+--------------+-------------+
|   bucket_data|max_in_bucket|
+--------------+-------------+
|[1, 3, 19, 20]|           20|
|    [105, 106]|          106|
|       [45678]|        45678|
|    [305, 399]|          399|
+--------------+-------------+



# Q3 : Mask the digits of a 16-digit credit card number

![q3.png](attachment:q3.png)

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, lit, expr

# Create a Spark session
spark = SparkSession.builder.appName("MaskCreditCard").getOrCreate()

# Define the data
data = [
 (1, "Rahul", "1234567891234567"),
 (2, "Raj", "1004567892345678"),
 (3, "Priya", "0234567893456789"),
 (4, "Murti", "2234567890123456")
]

# Define the schema
schema = ["ID", "Name", "CreditCard"]

# Create a DataFrame
df = spark.createDataFrame(data, schema)

# Mask the credit card number
masked_df = df.withColumn("MaskedCreditCard", 
 concat(
 df.CreditCard.substr(1, 4), # First 4 digits
 lit("********"), # 8 asterisks
 df.CreditCard.substr(13, 4) # Last 4 digits
 ))

# Show the original and masked credit card numbers
masked_df.select("ID", "Name", "CreditCard", "MaskedCreditCard").show()

24/04/06 17:53:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+---+-----+----------------+----------------+
| ID| Name|      CreditCard|MaskedCreditCard|
+---+-----+----------------+----------------+
|  1|Rahul|1234567891234567|1234********4567|
|  2|  Raj|1004567892345678|1004********5678|
|  3|Priya|0234567893456789|0234********6789|
|  4|Murti|2234567890123456|2234********3456|
+---+-----+----------------+----------------+

