<a href="https://colab.research.google.com/github/gitanujjain/MLdataset/blob/master/Rdd_operation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar -xf spark-3.5.0-bin-hadoop3.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [4]:
print(spark)
print(spark.sparkContext)
sc=spark.sparkContext

<pyspark.sql.session.SparkSession object at 0x7b723127a020>
<SparkContext master=local appName=Colab>


# RDD METHOD

In [None]:
print(sc.appName)

Colab


In [None]:
# create The RDD
n_rdd=sc.range(1,5)
n_rdd.collect()

[1, 2, 3, 4]

In [None]:
text_rdd=sc.textFile('/content/sample_data/mnist_test.csv')
print(text_rdd)

/content/sample_data/mnist_test.csv MapPartitionsRDD[5] at textFile at NativeMethodAccessorImpl.java:0


In [None]:

#sc.stop()

In [None]:
text_rdd2=sc.wholeTextFiles('/content/sample_data/mnist_test.csv')
print(text_rdd2)

 **map(func)**:

---
Definition: Applies a function to each element in the RDD and returns a new RDD with the results.

In [None]:
rdd=sc.parallelize([1,2,3])
squrd_rdd=rdd.map(lambda x:x**2)
print(squrd_rdd.collect())

[1, 4, 9]


**filter(func):**

---

Definition: Returns a new RDD with elements that satisfy the given predicate (function).

In [None]:
rdd=sc.parallelize(range(1,11))

In [None]:
filter_rdd=rdd.filter(lambda x:x%2==0)
print(filter_rdd.collect())

[2, 4, 6, 8, 10]


**flatMap(func):**

---
Definition: Similar to `map`, but each input item can be mapped to 0 or more output items.

In [None]:
word_rdd=sc.parallelize(['Hello world', 'Apache Spark'])

In [None]:
flatmap_word_rdd=word_rdd.flatMap(lambda line: line.split(" "))
print(flatmap_word_rdd.collect())

['Hello', 'world', 'Apache', 'Spark']


**distinct(numPartitions=None):**

---
Definition: Returns a new RDD with distinct elements.

In [None]:
rdd = sc.parallelize([1, 2, 2, 3, 3, 3])
distinct_rdd = rdd.distinct()
print(distinct_rdd.collect())

[1, 2, 3]


**sample(withReplacement, fraction, seed=None):**

---
Definition: Returns a random sample of the RDD.


In [None]:
rdd=sc.parallelize([1,2,3,4,5,5,6])
sampler=rdd.sample(withReplacement=False, fraction=0.5)
print(sampler.collect())

[1, 2, 5, 5, 6]


**union(other):**

---
Definition: Returns a new RDD that contains the union of the elements in the source RDD and the other RDD.

In [None]:
rdd1=sc.parallelize([1,2,3,4,5,6])
rdd2=sc.parallelize([11,22,33,44,55])
rdd3=rdd1.union(rdd2)
print(rdd3.collect())

[1, 2, 3, 4, 5, 6, 11, 22, 33, 44, 55]


**intersection(other):**
Definition: Returns a new RDD with common elements between the source RDD and the other RDD.

In [None]:
rdd4=rdd1.intersection(rdd2)
print(rdd4.collect())

[]


**subtract(other):**

---

Definition: Returns a new RDD with elements from the source RDD that are not present in the other RDD.

In [None]:
rdd5=rdd1.subtract(rdd2)
print(rdd5.collect())

[2, 4, 6, 1, 3, 5]


**zip(other):**

---
Definition: Returns a new RDD by pairing elements from two RDDs.

In [None]:
rdd1=sc.parallelize(['a','b','c','d'])
rdd2=sc.parallelize([1,2,3,4])
rdd6=rdd1.zip(rdd2)
print(rdd6.collect())

[('a', 1), ('b', 2), ('c', 3), ('d', 4)]


**zipWithIndex():**
Definition: Returns a new RDD by adding index to each element of the RDD.

In [None]:
rdd=sc.parallelize(["A","B","C"])
index_rdd=rdd.zipWithIndex()
print(index_rdd.collect())

[('A', 0), ('B', 1), ('C', 2)]


In [None]:
u_index_rdd=rdd.zipWithUniqueId()
print(u_index_rdd.collect())

[('A', 0), ('B', 1), ('C', 2)]


**cartesian(other):**

---
Definition: Returns the Cartesian product of the source RDD and the other RDD.

In [None]:
rdd1=sc.parallelize([1,2,3])
rdd2=sc.parallelize(['A','B'])
rdd3=rdd1.cartesian(rdd2)
print(rdd3.collect())

[(1, 'A'), (1, 'B'), (2, 'A'), (2, 'B'), (3, 'A'), (3, 'B')]


**groupByKey(numPartitions=None):**

---


Definition: Groups elements by key and returns an RDD of `(key, iterable)` pairs.

In [None]:
data=[('A',1),('B',2),('C',3),('A',2),('A',3),('C',3)]
rdd_data=sc.parallelize(data)
grouped_rdd=rdd_data.groupByKey()
print(grouped_rdd.collect())
print(grouped_rdd.mapValues(list).collect())

[('A', <pyspark.resultiterable.ResultIterable object at 0x7aaf9622a920>), ('B', <pyspark.resultiterable.ResultIterable object at 0x7aaf96228c10>), ('C', <pyspark.resultiterable.ResultIterable object at 0x7aaf9622a1d0>)]
[('A', [1, 2, 3]), ('B', [2]), ('C', [3, 3])]


**reduceByKey(func, numPartitions=None):**

---


Definition: Aggregates values of each key using a specified function.

In [None]:
reduce_rdd=rdd_data.reduceByKey(lambda a,b:a+b)
print(reduce_rdd.collect())

[('A', 6), ('B', 2), ('C', 6)]


sortByKey(ascending=True, numPartitions=None):
Definition: Sorts the RDD by key.

---



In [None]:
data=[("A",1),("B",3), ("C",0)]
rdd_1=sc.parallelize(data)
sorted_rdd=rdd_1.sortByKey()
print(sorted_rdd.collect())

[('A', 1), ('B', 3), ('C', 0)]


**mapPartitions(func):**

---


Definition: Applies a function to each partition of the RDD.

In [None]:
def partition_sum(iterator):
  yield sum(iterator)

rdd_2=sc.parallelize([1,2,3,4,5],3)
partition_rdd=rdd_2.mapPartitions(partition_sum)
partition_rdd.collect()

[1, 5, 9]

mapPartitionsWithIndex(func):
Definition: Applies a function to each partition of the RDD with index.

In [None]:
def partition_sum_with_index(index, iterator):
  yield index , sum(iterator)

rdd_3=sc.parallelize([1,2,3,4,5,6],2)
partition_index_sum=rdd_3.mapPartitionsWithIndex(partition_sum_with_index)
partition_index_sum.collect()

[(0, 6), (1, 15)]

**sample(withReplacement, fraction, seed=None)**

---


Definition: Returns a random sample of the RDD.

In [6]:
rdd=sc.parallelize([1,2,3,4,5,6,7])
sample_rdd=rdd.sample(withReplacement=False, fraction=0.5)
print(sample_rdd.collect())

[3, 4, 5, 6]


**distinct(numPartitions=None):**

---


Definition: Returns a new RDD with distinct elements.

In [7]:
rdd=sc.parallelize([1,2,3,4,2,3,4,5,6,5,3,2,2,1,1,3,3])
distinct_rdd=rdd.distinct()
print(distinct_rdd.collect())

[1, 2, 3, 4, 5, 6]


**groupByKey(numPartitions=None):**

---


Definition: Groups elements by key and returns an RDD of `(key, iterable)` pairs.

In [10]:
data = [("A", 1),("B", 2), ("A",3), ("C", 4)]
rdd=sc.parallelize(data)
grouped_rdd=rdd.groupByKey()
print(grouped_rdd.mapValues(list).collect())

[('A', [1, 3]), ('B', [2]), ('C', [4])]


**reduceByKey(func, numPartitions=None):**

---

Definition: Aggregates values of each key using a specified function.

In [11]:
reduce_rdd=rdd.reduceByKey(lambda a,b:a+b)
print(reduce_rdd.collect())

[('A', 4), ('B', 2), ('C', 4)]


**sortByKey(ascending=True, numPartitions=None):**

---

Definition: Sorts the RDD by key.

In [13]:
data =[("B",2),("A",1),("C",3)]
rdd= sc.parallelize(data)
sorted_rdd=rdd.sortByKey()
print(sorted_rdd.collect())

[('A', 1), ('B', 2), ('C', 3)]


** coalesce(numPartitions):**

---

Definition: Reduces the number of partitions in the RDD to the specified number.

In [15]:
rdd=sc.parallelize([1,2,3,4,5,6,7,8,9,0],5)
coll_rdd= rdd.coalesce(2)
print(coll_rdd.getNumPartitions())

2


**repartition(numPartitions):**

---


Definition: Reshuffles the data in the RDD and creates the specified number of partitions.

In [16]:
rdd=sc.parallelize([1,2,3,4,5,6,7,8,9],2)
repa_rdd=rdd.repartition(4)
print(repa_rdd.getNumPartitions())

4


**mapValues(func):**

---
Definition: Applies a function to the values of each key-value pair in the RDD.

In [17]:
data=[("A",1),("B",2),("A",4)]
rdd=sc.parallelize(data)
map_vale_rdd=rdd.mapValues(lambda x: x+10)
print(map_vale_rdd.collect())

[('A', 11), ('B', 12), ('A', 14)]


**flatMapValues(func):**

---
Definition: Similar to `mapValues`, but each input item can be mapped to 0 or more output items.

In [18]:
data=[("A",[1,2]), ("B",[3]),("A",[4,5])]
rdd=sc.parallelize(data)
new_rdd=rdd.flatMapValues(lambda x:x)
print(new_rdd.collect())

[('A', 1), ('A', 2), ('B', 3), ('A', 4), ('A', 5)]


aggregateByKey(zeroValue, seqFunc, combFunc, numPartitions=None):
Definition: Aggregates the values of each key using a given function.

In [22]:
data=[("A",1),("A",2),("B",1),('B',2),('C',3)]
rdd=sc.parallelize(data)
aggregated_rdd=rdd.aggregateByKey(lambda x:(x,1),
                                  lambda acc,value:(acc[0]+value,acc[1]+1),
                                  lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))


**combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions=None):**

---

Definition: Aggregates values of each key by first applying a combine function per partition, and then combining the results across partitions.

In [5]:
data = [('A',1),('B',2),('A',3),('C',4)]
rdd= sc.parallelize(data)
combine_data=rdd.combineByKey(lambda x: (x,1), lambda acc, value:(acc[0]+value,acc[1]+1),lambda acc1, acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))
print(combine_data.collect())

[('A', (4, 2)), ('B', (2, 1)), ('C', (4, 1))]


**pipe(command, env=None):**

---


Definition: Pipes the elements of the RDD through an external command and returns the output as a new RDD.

In [None]:
rdd=sc.parallelize(["Hello","pyspark"])
pipe_rdd=rdd.pipe("appche pyspark")
print(pipe_rdd.collect())

**glom():**

---
Definition: Returns an RDD where each partition is a list of elements.

In [None]:
rdd=sc.parallelize([1,2,3,4,5],2)
glom_rdd=rdd.glom()
print(glom_rdd.collect())

**keyBy(f):**

---
Definition: Returns a new RDD by creating tuples where the first element is the result of applying the function to each element in the RDD, and the second element is the original element

In [None]:
rdd=sc.parallelize(["A","B","C"])
key_by_rdd=rdd.keyBy(lambda x: x.lower())
print(key_by_rdd.collect())

In [None]:
from pyspark.sql.types import *
from pyspark.sql.types import LongType

**1. Create Empty RDD in PySpark** : there are two method


*   emptyRDD()
*   parallelize()

*Note: If you try to perform operations on empty RDD you going to get ValueError("RDD is empty").*

In [None]:
emptyRDD = spark.sparkContext.emptyRDD()
print(emptyRDD)

In [None]:
rdd2= spark.sparkContext.parallelize(['Hello'])
print(rdd2)

2. Create Empty DataFrame with Schema (StructType)

In [None]:
from pyspark.sql.types import StructType, StructType, StringType

In [None]:
schema = StructType([StructField("firstname",StringType(),True),StructField("middlename", StringType(), True), StructField("lastname", StringType(), True)])

In [None]:
#Now use the empty RDD created above and pass it to createDataFrame() of SparkSession along with the schema for column names & data types.
emptyDF=spark.createDataFrame(emptyRDD, schema)

In [None]:
emptyDF.printSchema()

**3. Convert Empty RDD to DataFrame**: create empty DataFrame by converting empty RDD to DataFrame using toDF()

In [None]:
emptyDF1=emptyRDD.toDF(schema)
emptyDF1.printSchema()

4. Create Empty DataFrame with Schema.


In [None]:
df1=spark.createDataFrame([], schema)
df1.printSchema()

**5. Create Empty DataFrame without Schema (no columns)**: To create empty DataFrame with out schema (no columns) just create a empty schema and use it while creating PySpark DataFrame.

In [None]:
df2=spark.createDataFrame([], StructType())

In [None]:
df2.printSchema()

**PySpark Replace Empty Value With None/null on DataFrame**

In [None]:
data=[("anuj",""),("", "delhi"),("Arham","Ahemmadabad"),("","Jammu")]
df1=spark.createDataFrame(data,["name","location"])
df1.show()

**PySpark Replace Empty Value with None**: In order to replace empty value with None/null on single DataFrame column, you can use withColumn() and when().otherwise() function.

In [None]:
from pyspark.sql.functions import col, when

In [None]:
df1.withColumn("name",when(col("name")=="", None).otherwise(col("name"))).show()

***Replace Empty Value with None on All DataFrame Columns***
To replace an empty value with None/null on all DataFrame columns, use df.columns to get all DataFrame columns, loop through this by applying conditions.


In [None]:
df2=df1.select([when(col(c)=="",None).otherwise(col(c)).alias(c) for c in df1.columns])
df2.show()

**Replace Empty Value with None on Selected Columns**
Similarly, you can also replace a selected list of columns, specify all columns you wanted to replace in a list and use this on same expression above.


In [None]:
replce_col=["location"]
df3=df1.select([when(col(c)=="", None).otherwise(col(c)).alias(c) for c in replce_col])
df3.show()

**PySpark When Otherwise | SQL Case When Usage**: PySpark supports a way to check multiple conditions in sequence and returns a value when the first condition met by using SQL like case when and when().otherwise() expressions, these works similar to “Switch" and "if then else" statements.

PySpark When Otherwise – when() is a SQL function that returns a Column type and otherwise() is a function of Column, if otherwise() is not used, it returns a None/NULL value.

PySpark SQL Case When – This is similar to SQL expression, Usage: CASE WHEN cond1 THEN result WHEN cond2 THEN result... ELSE result END

In [None]:
data=[("james","M",6000),("Michael","M",70000),("Robert",None,400000),("Maria","F",500000),("jen","",None)]

In [None]:
columns=["name","gender","salary"]


In [None]:
df=spark.createDataFrame(data=data,schema=columns)

In [None]:
df.show()

1. Using when() otherwise() on PySpark DataFrame.
PySpark when() is SQL function, in order to use this first you should import and this returns a Column type, otherwise() is a function of Column, when otherwise() not used and none of the conditions met it assigns None (Null) value. Usage would be like when(condition).otherwise(default).

when() function take 2 parameters, first param takes a condition and second takes a literal value or Column, if condition evaluates to true then it returns a value from second param.

The below code snippet replaces the value of gender with a new derived value, when conditions not matched, we are assigning “Unknown” as value, for null assigning empty.



In [None]:
from pyspark.sql.functions import when

In [None]:
df2=df.withColumn("New_column", when(df.gender == "M","MALE")\
                              .when(df.gender == "F","FEMALE")\
                              .when(df.gender.isNull(),"")\
                              .otherwise(df.gender))

In [None]:
df2.show()

In [None]:
from pyspark.sql.functions import col
df2= df.select(col("*"), when(df.gender == "M","Male").when(df.gender=="F","Female").when(df.gender.isNull(),"").otherwise(df.gender).alias("new_gender"))

**2. PySpark SQL Case When on DataFrame**.
If you have a SQL background you might have familiar with Case When statement that is used to execute a sequence of conditions and returns a value when the first condition met, similar to SWITH and IF THEN ELSE statements. Similarly, PySpark SQL Case When statement can be used on DataFrame, below are some of the examples of using with withColumn(), select(), selectExpr() utilizing expr() function.

Syntax of SQL CASE WHEN ELSE END


CASE
    WHEN condition1 THEN result_value1
    WHEN condition2 THEN result_value2
    -----
    -----
    ELSE result
END;

*   CASE is the start of the expression
*   Clause WHEN takes a condition, if condition true it returns a value from THEN
*   If the condition is false it goes to the next condition and so on.
*   If none of the condition matches, it returns a value from the ELSE clause.

*   END is to end the expression.









In [None]:
from pyspark.sql.functions import expr, col
df3=df.withColumn("new_gender", expr("case when gender='M' then 'Male' when gender='F' then 'female' when gender is NULL then '' else gender end"))

In [None]:
df3.show(truncate=False)

In [None]:
df4= df.select(col("*"), expr("case when gender='M' then 'Male' when gender='F' then 'female' when gender is NULL then '' else gender end").alias("new_g"))
df4.show()

**2.2 Using Case When on SQL Expression**
You can also use Case When with SQL statement after creating a temporary view. This returns a similar output as above.

In [None]:
df.createOrReplaceTempView("EMP")
spark.sql("select name , case when gender ='M' then 'MALE'"\
          "when gender ='F' then 'female'"\
          "when gender is null then '' "\
          "else gender end as new_gender from emp").show()

**2.3. Multiple Conditions using & and | operator**
We often need to check with multiple conditions, below is an example of using PySpark When Otherwise with multiple conditions by using and (&) or (|) operators. To explain this I will use a new set of data to make it simple.

PySpark SQL expr() (Expression) Function

PySpark expr() is a SQL function to execute SQL-like expressions and to use an existing DataFrame column value as an expression argument to Pyspark built-in functions.
Most of the commonly used SQL functions are either part of the PySpark Column class or built-in pyspark.
sql.functions API, besides these PySpark also supports many other SQL functions,
so in order to use these, you have to use expr() function.

Below are 2 use cases of PySpark expr() funcion.

First, allowing to use of SQL-like functions that are not present in PySpark Column type & pyspark.sql.functions API. for example CASE WHEN, regr_count().
Second, it extends the PySpark SQL Functions by allowing to use DataFrame columns in functions for expression. for example, if you wanted to add a month value from a column to a Date column. I will explain this in the example below.
**bold text**

In [None]:
data=[("James","Bond"),("AMuj","jain")]
df=spark.createDataFrame(data).toDF("col1","col2")

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import expr
df.withColumn("full name", expr("col1 || ',' || col2")).show()

In [None]:
data =[("James","M"),("Michael","F"),("Jen","")]

In [None]:
columns=["Name","gender"]
df=spark.createDataFrame(data, schema=columns)

In [None]:
df.show()

In [None]:
df2=df.withColumn("gender", expr("case when gender='M' then 'male' when gender='F' then 'Female' else 'unknown' end "))

In [None]:
df2.show()

In [None]:
data2=[("2013-01-01",1),("2010-01-04",2),("2013-01-05",3)]
df=spark.createDataFrame(data2).toDF("date","increment")
df.show()

In [None]:
df.select(df.date, df.increment,expr("add_months(date,increment)").alias("inc_date")).show()

In [None]:
df.select(df.date, df.increment,expr("add_months(date,increment) as inc_date")).show()

In [None]:
df.select("increment", expr("cast(increment as string) as str_increment")).printSchema()

In [None]:
df.select(df.date, df.increment,expr("increment + 5 as new_increment")).show()

In [None]:
data=[(200,200),(500,500),(300,200)]
df=spark.createDataFrame(data, schema=('col1','col2'))

In [None]:
df.show()

In [None]:
df.select(df.col1,df.col2,expr("col1==col2")).show()

In [None]:
df.filter(expr("col1==col2")).show()

**PySpark lit() – Add Literal or Constant to DataFrame**
PySpark SQL functions lit() and typedLit() are used to add a new column to DataFrame by assigning a literal or constant value. Both these functions return Column type as return type.

Both of these are available in PySpark by importing pyspark.sql.functions

In [None]:
data=[("111", 50000),("112",50001),("113", 50003),("114",50004)]
columns=["empid","salary"]
df=spark.createDataFrame(data, schema=columns)

In [None]:
df.show()

In [None]:
from pyspark.sql.functions import col,lit
df2=df.select(col("empid"), col("salary"), lit("1").alias("lit_content"))

In [None]:
df2.show()

In [None]:
from pyspark.sql.functions import when, lit, col

In [None]:
df3=df2.withColumn("lit_value2", when((col("Salary")>=40000) & (col("Salary")<=50000),lit("100")).otherwise(lit("200")))

In [None]:
df3.show()

**typedLit() Function** – Syntax
Difference between lit() and typedLit() is that, typedLit function can handle collection types e.g.: Array, Dictionary(map) e.t.c. Unfortunately, I could not find this function in PySpark, when I find it, I will add an example.

# **Explode**
The `explode` function helps you to transforming arrays or maps within a column into separate rows, creating a more granular view of your data. It transforms each element of an array or each key-value pair of a map into a separate row, making it a must-know for efficient data manipulation.

**Syntex:**
```
df_expanded = df.select("id", explode("your_array_column").alias("exploded_column"))
```

𝑾𝒉𝒚 𝑼𝒔𝒆 `𝒆𝒙𝒑𝒍𝒐𝒅𝒆`?
- 𝐹𝑙𝑎𝑡𝑡𝑒𝑛𝑖𝑛𝑔 𝐴𝑟𝑟𝑎𝑦𝑠: Ideal for scenarios where you want to transform arrays into individual rows.
- 𝐻𝑎𝑛𝑑𝑙𝑖𝑛𝑔 𝑁𝑒𝑠𝑡𝑒𝑑 𝐷𝑎𝑡𝑎: Perfect for working with complex, nested structures common in real-world datasets.

𝑯𝒐𝒘 𝑪𝒂𝒏 𝒀𝒐𝒖 𝑳𝒆𝒗𝒆𝒓𝒂𝒈𝒆 𝑰𝒕?
- 𝑁𝑒𝑠𝑡𝑒𝑑 𝐽𝑆𝑂𝑁 𝑃𝑟𝑜𝑐𝑒𝑠𝑠𝑖𝑛𝑔: Unpack nested JSON arrays or maps for analysis.
- 𝐷𝑎𝑡𝑎 𝑁𝑜𝑟𝑚𝑎𝑙𝑖𝑧𝑎𝑡𝑖𝑜𝑛: Flatten arrays to simplify downstream processing.
- 𝐸𝑥𝑝𝑙𝑜𝑟𝑎𝑡𝑜𝑟𝑦 𝐷𝑎𝑡𝑎 𝐴𝑛𝑎𝑙𝑦𝑠𝑖𝑠: Gain insights from nested structures with ease.



In [None]:
from pyspark.sql.functions import explode

# Sample data
data = [(1, [10, 20, 30]), (2, [40, 50]), (3, [60])]

# Create a DataFrame and showing raw data
df = spark.createDataFrame(data, ["id", "numbers"])
df.show()

+---+------------+
| id|     numbers|
+---+------------+
|  1|[10, 20, 30]|
|  2|    [40, 50]|
|  3|        [60]|
+---+------------+



In [None]:
# Use explode for arrays
df_expanded = df.select("id", explode("numbers").alias("number"))

# Show the result
df_expanded.show()

+---+------+
| id|number|
+---+------+
|  1|    10|
|  1|    20|
|  1|    30|
|  2|    40|
|  2|    50|
|  3|    60|
+---+------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


In [None]:
data = [("James","","Smith","36636","M",3000),
    ("Michael","Rose","","40288","M",4000),
    ("Robert","","Williams","42114","M",4000),
    ("Maria","Anne","Jones","39192","F",4000),
    ("Jen","Mary","Brown","","F",-1)
  ]



In [None]:
schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])


In [None]:
df = spark.createDataFrame(data=data,schema=schema)


In [None]:
for each in df.collect():
  print(each)

Row(firstname='James', middlename='', lastname='Smith', id='36636', gender='M', salary=3000)
Row(firstname='Michael', middlename='Rose', lastname='', id='40288', gender='M', salary=4000)
Row(firstname='Robert', middlename='', lastname='Williams', id='42114', gender='M', salary=4000)
Row(firstname='Maria', middlename='Anne', lastname='Jones', id='39192', gender='F', salary=4000)
Row(firstname='Jen', middlename='Mary', lastname='Brown', id='', gender='F', salary=-1)


In [None]:
df.printSchema()

root
 |-- firstname: string (nullable = true)
 |-- middlename: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
df.schema.jsonValue()

{'type': 'struct',
 'fields': [{'name': 'firstname',
   'type': 'string',
   'nullable': True,
   'metadata': {}},
  {'name': 'middlename', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'lastname', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'id', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'gender', 'type': 'string', 'nullable': True, 'metadata': {}},
  {'name': 'salary', 'type': 'integer', 'nullable': True, 'metadata': {}}]}

In [None]:
schemaFromJson = StructType.fromJson(df.schema.jsonValue())


In [None]:
print(schemaFromJson)

StructType([StructField('firstname', StringType(), True), StructField('middlename', StringType(), True), StructField('lastname', StringType(), True), StructField('id', StringType(), True), StructField('gender', StringType(), True), StructField('salary', IntegerType(), True)])


In [None]:
df.createOrReplaceTempView("source_update");

In [None]:
spark.sql("select * from source_update").show()

+---------+----------+--------+-----+------+------+
|firstname|middlename|lastname|   id|gender|salary|
+---------+----------+--------+-----+------+------+
|    James|          |   Smith|36636|     M|  3000|
|  Michael|      Rose|        |40288|     M|  4000|
|   Robert|          |Williams|42114|     M|  4000|
|    Maria|      Anne|   Jones|39192|     F|  4000|
|      Jen|      Mary|   Brown|     |     F|    -1|
+---------+----------+--------+-----+------+------+



**Sapient Test**

I have a data set which contains fields such as: item, event, timestamp, userid while lacking of the sessionId.
I'm expected to create a session _id which expires for every 30 minutes window.
This session _id should be unique per session per user.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F

In [None]:

df=spark.createDataFrame([
("blue","view",1610494094750,11),
("green","addtobag",1510593114350,21),
("red","close",1610493115350,41),
("blue","view",1610494094350,11),
("blue","close",1510593114312,21),
("red","view",1610493114350,41),
("red","view",1610593114350,41),
("green","purchase",1610494094350,31)
],["item","event","timestamp","userId"])

df.show()

+-----+--------+-------------+------+
| item|   event|    timestamp|userId|
+-----+--------+-------------+------+
| blue|    view|1610494094750|    11|
|green|addtobag|1510593114350|    21|
|  red|   close|1610493115350|    41|
| blue|    view|1610494094350|    11|
| blue|   close|1510593114312|    21|
|  red|    view|1610493114350|    41|
|  red|    view|1610593114350|    41|
|green|purchase|1610494094350|    31|
+-----+--------+-------------+------+



In [None]:
#Gettheprevioustimestampforeachuserid
df=df.withColumn( "session_id", F.lag("timestamp").over(Window.partitionBy("userid").orderBy("timestamp")),)
df.show()

+-----+--------+-------------+------+-------------+
| item|   event|    timestamp|userId|   session_id|
+-----+--------+-------------+------+-------------+
| blue|    view|1610494094350|    11|         NULL|
| blue|    view|1610494094750|    11|1610494094350|
| blue|   close|1510593114312|    21|         NULL|
|green|addtobag|1510593114350|    21|1510593114312|
|green|purchase|1610494094350|    31|         NULL|
|  red|    view|1610493114350|    41|         NULL|
|  red|   close|1610493115350|    41|1610493114350|
|  red|    view|1610593114350|    41|1610493115350|
+-----+--------+-------------+------+-------------+



In [None]:

#Define if the session is the1 stone(more than1800 s after the previous one)
df1=df.withColumn(
"session_id",
F.when(F.col("timestamp")-F.col("session_id")<=1800,0).otherwise(1),
)
df1.show()

+-----+--------+-------------+------+----------+
| item|   event|    timestamp|userId|session_id|
+-----+--------+-------------+------+----------+
| blue|    view|1610494094350|    11|         1|
| blue|    view|1610494094750|    11|         0|
| blue|   close|1510593114312|    21|         1|
|green|addtobag|1510593114350|    21|         0|
|green|purchase|1610494094350|    31|         1|
|  red|    view|1610493114350|    41|         1|
|  red|   close|1610493115350|    41|         0|
|  red|    view|1610593114350|    41|         1|
+-----+--------+-------------+------+----------+



In [None]:
#create a unique session id per session(same id can exists for different users)
df2=df1.withColumn(
"session_id",
F.sum("session_id").over(Window.partitionBy("userid").orderBy("timestamp")),
)
df2.show()


+-----+--------+-------------+------+----------+
| item|   event|    timestamp|userId|session_id|
+-----+--------+-------------+------+----------+
| blue|    view|1610494094350|    11|         1|
| blue|    view|1610494094750|    11|         1|
| blue|   close|1510593114312|    21|         1|
|green|addtobag|1510593114350|    21|         1|
|green|purchase|1610494094350|    31|         1|
|  red|    view|1610493114350|    41|         1|
|  red|   close|1610493115350|    41|         1|
|  red|    view|1610593114350|    41|         2|
+-----+--------+-------------+------+----------+



In [None]:
#create a unique session id per session per user
df3=df2.withColumn(
"session_id",F.dense_rank().over(Window.orderBy("userid","session_id"))
)

df3.show()

+-----+--------+-------------+------+----------+
| item|   event|    timestamp|userId|session_id|
+-----+--------+-------------+------+----------+
| blue|    view|1610494094350|    11|         1|
| blue|    view|1610494094750|    11|         1|
| blue|   close|1510593114312|    21|         2|
|green|addtobag|1510593114350|    21|         2|
|green|purchase|1610494094350|    31|         3|
|  red|    view|1610493114350|    41|         4|
|  red|   close|1610493115350|    41|         4|
|  red|    view|1610593114350|    41|         5|
+-----+--------+-------------+------+----------+



In [None]:
df=spark.createDataFrame([
("blue","view",1610494094750,11),
("green","addtobag",1510593114350,21),
("red","close",1610493115350,41),
("blue","view",1610494094350,11),
("blue","close",1510593114312,21),
("red","view",1610493114350,41),
("red","view",1610593114350,41),
("green","purchase",1610494094350,31)
],["item","event","timestamp","userId"])

In [None]:
df = spark.createDataFrame([('Alice', 1), ('Bob', 2), ('Carol', 3)], ['name', 'age'])

# Group the data by the "name" column
grouped_df = df.groupBy('name')

+------+
|userId|
+------+
|    31|
|    41|
|    11|
|    21|
+------+

