<a href="https://colab.research.google.com/github/gitanujjain/pyspark_practice/blob/main/handle_multi_delimiters.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar -xf spark-3.5.3-bin-hadoop3.tgz
!pip install -q findspark

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config("spark.driver.memory", "8g")\
        .config("spark.driver.cores","4")\
        .getOrCreate()

In [5]:
sc=spark.sparkContext
sc.setLogLevel("ERROR")

In [10]:
from pyspark.sql.types import StructType,StringType, StructField, IntegerType, StringType, ArrayType


In [11]:
data = [
    (1, "Arabinda", 23, "32|49|39"),
    (2, "Shyam", 34, "32|90|31"),
    (3, "Raghu", 42, "30|98|43"),
    (4, "John", 27, "43|87|56"),
    (5, "Su", 29, "65|76|29"),
    (6, "Manderic", 36, "89|45|90")
]


In [12]:
schema = StructType([
    StructField("ID", IntegerType(), False),
    StructField("Name", StringType(),True),
    StructField("Age", IntegerType(), True),
    StructField("Marks", StringType(),True)])

In [13]:
df=spark.createDataFrame(data, schema=schema)

In [14]:
df.show()

+---+--------+---+--------+
| ID|    Name|Age|   Marks|
+---+--------+---+--------+
|  1|Arabinda| 23|32|49|39|
|  2|   Shyam| 34|32|90|31|
|  3|   Raghu| 42|30|98|43|
|  4|    John| 27|43|87|56|
|  5|      Su| 29|65|76|29|
|  6|Manderic| 36|89|45|90|
+---+--------+---+--------+



In [15]:
import pyspark.sql.functions as f
df_split=df.withColumn("Marks",f.split(df["marks"],"\|"))

In [16]:
df_split.show()

+---+--------+---+------------+
| ID|    Name|Age|       Marks|
+---+--------+---+------------+
|  1|Arabinda| 23|[32, 49, 39]|
|  2|   Shyam| 34|[32, 90, 31]|
|  3|   Raghu| 42|[30, 98, 43]|
|  4|    John| 27|[43, 87, 56]|
|  5|      Su| 29|[65, 76, 29]|
|  6|Manderic| 36|[89, 45, 90]|
+---+--------+---+------------+



In [17]:
num_subjects= len(df_split.select("Marks").take(1)[0][0])
print(num_subjects)

3


In [18]:
column_name=[f"sub_{each}" for each in range(num_subjects)]
print(column_name)

['sub_0', 'sub_1', 'sub_2']


In [19]:
for i, col_name in enumerate(column_name):
    df_split=df_split.withColumn(col_name, df_split["Marks"].getItem(i).cast(IntegerType()))

In [20]:
df_split.show()

+---+--------+---+------------+-----+-----+-----+
| ID|    Name|Age|       Marks|sub_0|sub_1|sub_2|
+---+--------+---+------------+-----+-----+-----+
|  1|Arabinda| 23|[32, 49, 39]|   32|   49|   39|
|  2|   Shyam| 34|[32, 90, 31]|   32|   90|   31|
|  3|   Raghu| 42|[30, 98, 43]|   30|   98|   43|
|  4|    John| 27|[43, 87, 56]|   43|   87|   56|
|  5|      Su| 29|[65, 76, 29]|   65|   76|   29|
|  6|Manderic| 36|[89, 45, 90]|   89|   45|   90|
+---+--------+---+------------+-----+-----+-----+



In [21]:
df_split=df_split.drop("Marks")
df_split.show()

+---+--------+---+-----+-----+-----+
| ID|    Name|Age|sub_0|sub_1|sub_2|
+---+--------+---+-----+-----+-----+
|  1|Arabinda| 23|   32|   49|   39|
|  2|   Shyam| 34|   32|   90|   31|
|  3|   Raghu| 42|   30|   98|   43|
|  4|    John| 27|   43|   87|   56|
|  5|      Su| 29|   65|   76|   29|
|  6|Manderic| 36|   89|   45|   90|
+---+--------+---+-----+-----+-----+



In [22]:
spark.stop()