### Pyspark Session 

In [3]:
# SparkSession

from pyspark.sql import SparkSession
#importing pyspark functions
from pyspark.sql.functions import col,isnan, when, count,sum
import os
import sys
# Creating SparkSession 
# Creating SparkSession 
# building session
#setting execution memory size 4gb
# setting driver memory size 4gb
# setting heap size 50gb
#enabling spark executor 
# selecting all cores to use
# setting the pyspark app name
# create session or get it if it exists
spark = SparkSession.builder \
.config('spark.executer.memory','4g') \
.config('spark.driver.memory','4g') \
.config('spark.offHeap.size','50g') \
.config('spark.executer.enabled','true') \
.appName("bigdata")\
.getOrCreate();

In [4]:
# https://www.kaggle.com/datasets/ravindrasinghrana/job-description-dataset
# 1. Read a CSV and print output to console
df_csv = spark.read.csv("F:/data/job_descriptions.csv")
df_csv.show(5)

+----------------+-------------+--------------+------------+----------+----------------+--------+---------+---------+------------+----------------+----------+------------------+--------------------+--------------------+--------------------+------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             _c0|          _c1|           _c2|         _c3|       _c4|             _c5|     _c6|      _c7|      _c8|         _c9|            _c10|      _c11|              _c12|                _c13|                _c14|                _c15|        _c16|                _c17|                _c18|                _c19|                _c20|                _c21|                _c22|
+----------------+-------------+--------------+------------+----------+----------------+--------+---------+---------+------------+----------------+----------+------------------+--------------------+--------------------+--------------------+

In [5]:
# 2. Read a JSON file and print output to console
#https://www.kaggle.com/datasets/prathamsharma123/farmers-protest-tweets-dataset-raw-json
df_json = spark.read.json("F:/data/farmers-protest-tweets-2021-03-5.json")
df_json.show(5)


+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------+--------+----------+-----------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+-----------+--------------------+--------------------+
|             content|     conversationId|                date|                 id|lang|likeCount|               media|mentionedUsers|outlinks|quoteCount|quotedTweet|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|tcooutlinks|                 url|                user|
+--------------------+-------------------+--------------------+-------------------+----+---------+--------------------+--------------+--------+----------+-----------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+-----------+--------------

In [6]:
# https://www.kaggle.com/datasets/uadithyan/sleep-states-dataset

# 3 Read parquet file and print output to console
df_parquet = spark.read.parquet("F:/data/CMI_sleep_data_updated.parquet")
df_parquet.show(5)

+------------+----+--------------------+--------+------+-----+-----------------+
|   series_id|step|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+----+--------------------+--------+------+-----+-----------------+
|08db4255286f|   0|2018-11-05T10:00:...|-30.8453|0.0447| NULL|                0|
|08db4255286f|   1|2018-11-05T10:00:...|-34.1818|0.0443| NULL|                1|
|08db4255286f|   2|2018-11-05T10:00:...|-33.8771|0.0483| NULL|                2|
|08db4255286f|   3|2018-11-05T10:00:...|-34.2821| 0.068| NULL|                3|
|08db4255286f|   4|2018-11-05T10:00:...|-34.3858|0.0768| NULL|                4|
+------------+----+--------------------+--------+------+-----+-----------------+
only showing top 5 rows



In [44]:
df = spark.read.format('avro').load("F:/data/userdata1.avro")

AnalysisException: Failed to find data source: avro. Avro is built-in but external data source module since Spark 2.4. Please deploy the application as per the deployment section of Apache Avro Data Source Guide.

In [11]:
# 5 Example for broadcast join (Inner join 2 dataframes)
from pyspark.sql.functions import broadcast
# https://github.com/MainakRepositor/Datasets/tree/master/Elon%20Tweets

# Reading 2020.csv into a DataFrame
df_2020Tweets = spark.read.csv("F:/data/2020.csv", header=True, inferSchema=True)

# Reading 2021.csv into a DataFrame
df_2021Tweets = spark.read.csv("F:/data/2021.csv", header=True, inferSchema=True)
df_2021Tweets.show(5)



+---+-------------------+-------------------+--------------+-------------------+--------+-----+--------------------+--------+--------+--------+--------+-----------+--------+---------+---+----+--------------------+----+------+-----+---------+-------+------+--------+---------+---------+------+----+----+------+----------+-------+----------+--------------------+------------+---------+---------+----------+
|_c0|                 id|    conversation_id|    created_at|               date|timezone|place|               tweet|language|hashtags|cashtags| user_id|user_id_str|username|     name|day|hour|                link|urls|photos|video|thumbnail|retweet|nlikes|nreplies|nretweets|quote_url|search|near| geo|source|user_rt_id|user_rt|retweet_id|            reply_to|retweet_date|translate|trans_src|trans_dest|
+---+-------------------+-------------------+--------------+-------------------+--------+-----+--------------------+--------+--------+--------+--------+-----------+--------+---------+---+---

In [17]:
# broadcast join
joined_df = df_2020Tweets.join(broadcast(df_2021Tweets), df_2020Tweets["id"] == df_2021Tweets["id"], "inner")

# columns join
result_df = joined_df.select(df_2020Tweets["*"], df_2021Tweets["tweet"])

# Show DataFrame
result_df.show(5)


+---+-------------------+-------------------+--------------+-------------------+--------+-----+--------------------+--------+--------+--------+--------+-----------+--------+---------+---+----+--------------------+----+------+-----+---------+-------+------+--------+---------+---------+------+----+----+------+----------+-------+----------+--------------------+------------+---------+---------+----------+--------------------+
|_c0|                 id|    conversation_id|    created_at|               date|timezone|place|               tweet|language|hashtags|cashtags| user_id|user_id_str|username|     name|day|hour|                link|urls|photos|video|thumbnail|retweet|nlikes|nreplies|nretweets|quote_url|search|near| geo|source|user_rt_id|user_rt|retweet_id|            reply_to|retweet_date|translate|trans_src|trans_dest|               tweet|
+---+-------------------+-------------------+--------------+-------------------+--------+-----+--------------------+--------+--------+--------+-----

In [22]:
# 6. Example for Filtering the data 
filtered_df = df_parquet.filter(df_parquet["enmo"] > 0.09)
filtered_df.show()


+------------+----+--------------------+--------+------+-----+-----------------+
|   series_id|step|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+----+--------------------+--------+------+-----+-----------------+
|08db4255286f|   6|2018-11-05T10:00:...|-30.5134|0.1073| NULL|                6|
|08db4255286f|  40|2018-11-05T10:03:...|-31.4288|0.0949| NULL|               40|
|08db4255286f|  42|2018-11-05T10:03:...|-28.9712|0.1037| NULL|               42|
|08db4255286f|  45|2018-11-05T10:03:...|-26.3368|0.0952| NULL|               45|
|08db4255286f|  46|2018-11-05T10:03:...|-27.0645|0.1327| NULL|               46|
|08db4255286f|  47|2018-11-05T10:03:...|-29.9288| 0.198| NULL|               47|
|08db4255286f|  48|2018-11-05T10:04:...|-29.0845| 0.144| NULL|               48|
|08db4255286f|  49|2018-11-05T10:04:...|-29.3285|0.1032| NULL|               49|
|08db4255286f|  50|2018-11-05T10:04:...|-27.6283|0.1116| NULL|               50|
|08db4255286f|  52|2018-11-0

In [23]:
filtered_df1 = df_parquet.filter(df_parquet["step"] > 50)
filtered_df1.show()

+------------+----+--------------------+--------+------+-----+-----------------+
|   series_id|step|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+----+--------------------+--------+------+-----+-----------------+
|08db4255286f|  51|2018-11-05T10:04:...|-25.5639|0.0806| NULL|               51|
|08db4255286f|  52|2018-11-05T10:04:...|-25.8617|0.1314| NULL|               52|
|08db4255286f|  53|2018-11-05T10:04:...|-25.0372|0.1681| NULL|               53|
|08db4255286f|  54|2018-11-05T10:04:...|-25.6563|0.1173| NULL|               54|
|08db4255286f|  55|2018-11-05T10:04:...| -23.548|0.0768| NULL|               55|
|08db4255286f|  56|2018-11-05T10:04:...|-24.7306|0.0833| NULL|               56|
|08db4255286f|  57|2018-11-05T10:04:...| -24.477|0.0653| NULL|               57|
|08db4255286f|  58|2018-11-05T10:04:...|-24.1588|0.0555| NULL|               58|
|08db4255286f|  59|2018-11-05T10:04:...|-24.4506|0.0493| NULL|               59|
|08db4255286f|  60|2018-11-0

In [24]:
# 7. Example for applying aggregate functions like  max, min, avg 
from pyspark.sql.functions import max, min,avg

df_parquet.agg(max("enmo"), min("enmo"), avg("enmo")).show()

 

+---------+---------+-------------------+
|max(enmo)|min(enmo)|          avg(enmo)|
+---------+---------+-------------------+
|   7.0161|      0.0|0.04428616508921974|
+---------+---------+-------------------+



In [26]:
# 8. Example for Read json file with typed schema (without infering the schema) with Structtype, StructField .....
from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DateType

# Define the schema
schema = StructType([
    StructField("summary", StringType(), True),
    StructField("content", StringType(), True),
    StructField("conversationId", StringType(), True),
    StructField("date", StringType(), True),
    StructField("id", StringType(), True),
    StructField("lang", StringType(), True),
    StructField("likeCount", StringType(), True),
    StructField("quoteCount", StringType(), True),
    StructField("renderedContent", StringType(), True),
    StructField("replyCount", StringType(), True),
    StructField("retweetCount", StringType(), True),
    StructField("retweetedTweet", StringType(), True),
    StructField("source", StringType(), True),
    StructField("sourceLabel", StringType(), True),
    StructField("sourceUrl", StringType(), True),
    StructField("url", StringType(), True)
])

# Read the JSON file with the specified schema
df_json_defined_schema = spark.read.schema(schema).json("F:/data/farmers-protest-tweets-2021-03-5.json")

# Show the DataFrame
df_json_defined_schema.show(5)


+-------+--------------------+-------------------+--------------------+-------------------+----+---------+----------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+
|summary|             content|     conversationId|                date|                 id|lang|likeCount|quoteCount|     renderedContent|replyCount|retweetCount|retweetedTweet|              source|        sourceLabel|           sourceUrl|                 url|
+-------+--------------------+-------------------+--------------------+-------------------+----+---------+----------+--------------------+----------+------------+--------------+--------------------+-------------------+--------------------+--------------------+
|   NULL|Support 👇\n\n#Fa...|1376739399593910273|2021-03-30T03:33:...|1376739399593910273|  en|        0|         0|Support 👇\n\n#Fa...|         0|           0|          NULL|<a href="http://t...|Twitter for Android|h

In [34]:
# 9. Example for increase and decrease number of dataframe partitions 
# Increase partitions
df_parquet_increase = df_parquet.repartition(15)
df_parquet_increase.show() # show dataframe 



+------------+------+--------------------+--------+------+-----+-----------------+
|   series_id|  step|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+------+--------------------+--------+------+-----+-----------------+
|18b61dd5aae8|351840|2018-01-12T01:10:...| 23.8277|   0.0| NULL|           351922|
|89bd631d1769|381941|2018-02-07T19:28:...|-61.4476|  0.01| NULL|           382028|
|7822ee8fe3ec|393653|2018-09-13T10:59:...|-16.8911|0.0622| NULL|           393740|
|67f5fc60e494|235083|2017-11-20T02:45:...|  86.529|   0.0| NULL|           235143|
|d5e47b94477e|  6248|2017-11-10T03:40:...| 64.6929|5.0E-4| NULL|             6248|
|c68260cc9e8f|136622|2017-10-25T10:15:...|-17.2825|0.0071| NULL|           136657|
|7822ee8fe3ec|268128|2018-09-06T04:39:...| -9.6864|   0.0| NULL|           268196|
|d5e47b94477e|306182|2017-11-27T11:15:...| -13.269|7.0E-4| NULL|           306259|
|72bbd1ac3edf|388907|2017-10-12T00:53:...|-16.3899|0.0077| NULL|           388994|
|89b

In [35]:
# Decrease partitions
df_parquet_decrease = df_parquet.coalesce(2)
df_parquet_decrease.show()  # show dataframe

+------------+----+--------------------+--------+------+-----+-----------------+
|   series_id|step|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+----+--------------------+--------+------+-----+-----------------+
|08db4255286f|   0|2018-11-05T10:00:...|-30.8453|0.0447| NULL|                0|
|08db4255286f|   1|2018-11-05T10:00:...|-34.1818|0.0443| NULL|                1|
|08db4255286f|   2|2018-11-05T10:00:...|-33.8771|0.0483| NULL|                2|
|08db4255286f|   3|2018-11-05T10:00:...|-34.2821| 0.068| NULL|                3|
|08db4255286f|   4|2018-11-05T10:00:...|-34.3858|0.0768| NULL|                4|
|08db4255286f|   5|2018-11-05T10:00:...|-34.9256|0.0511| NULL|                5|
|08db4255286f|   6|2018-11-05T10:00:...|-30.5134|0.1073| NULL|                6|
|08db4255286f|   7|2018-11-05T10:00:...|-30.5094|0.0649| NULL|                7|
|08db4255286f|   8|2018-11-05T10:00:...|-32.8806|0.0485| NULL|                8|
|08db4255286f|   9|2018-11-0

In [27]:
# 10. Example for renaming the column of the dataframe 

df_parquet_rename = df_parquet.withColumnRenamed("step", "step_renamed") # renaming column
df_parquet_rename.show()  # show dataframe

+------------+------------+--------------------+--------+------+-----+-----------------+
|   series_id|step_renamed|           timestamp|  anglez|  enmo|event|__index_level_0__|
+------------+------------+--------------------+--------+------+-----+-----------------+
|08db4255286f|           0|2018-11-05T10:00:...|-30.8453|0.0447| NULL|                0|
|08db4255286f|           1|2018-11-05T10:00:...|-34.1818|0.0443| NULL|                1|
|08db4255286f|           2|2018-11-05T10:00:...|-33.8771|0.0483| NULL|                2|
|08db4255286f|           3|2018-11-05T10:00:...|-34.2821| 0.068| NULL|                3|
|08db4255286f|           4|2018-11-05T10:00:...|-34.3858|0.0768| NULL|                4|
|08db4255286f|           5|2018-11-05T10:00:...|-34.9256|0.0511| NULL|                5|
|08db4255286f|           6|2018-11-05T10:00:...|-30.5134|0.1073| NULL|                6|
|08db4255286f|           7|2018-11-05T10:00:...|-30.5094|0.0649| NULL|                7|
|08db4255286f|       

In [28]:
# 11. Example for adding a new column to the dataframe 

df_parquet_addcolumn = df_parquet
df_parquet_addcolumn = df_parquet.withColumn("new_enmo_column_percent", col("enmo") * 100) # new enmo percentange column 
df_parquet_addcolumn.show()  # show dataframe

+------------+----+--------------------+--------+------+-----+-----------------+-----------------------+
|   series_id|step|           timestamp|  anglez|  enmo|event|__index_level_0__|new_enmo_column_percent|
+------------+----+--------------------+--------+------+-----+-----------------+-----------------------+
|08db4255286f|   0|2018-11-05T10:00:...|-30.8453|0.0447| NULL|                0|              4.4700003|
|08db4255286f|   1|2018-11-05T10:00:...|-34.1818|0.0443| NULL|                1|              4.4300003|
|08db4255286f|   2|2018-11-05T10:00:...|-33.8771|0.0483| NULL|                2|              4.8300004|
|08db4255286f|   3|2018-11-05T10:00:...|-34.2821| 0.068| NULL|                3|                    6.8|
|08db4255286f|   4|2018-11-05T10:00:...|-34.3858|0.0768| NULL|                4|              7.6800003|
|08db4255286f|   5|2018-11-05T10:00:...|-34.9256|0.0511| NULL|                5|                   5.11|
|08db4255286f|   6|2018-11-05T10:00:...|-30.5134|0.1073

In [49]:
# 12. Changing the structure of the dataframe
df_parquet.printSchema() # printing schema of the dataframe


root
 |-- series_id: string (nullable = true)
 |-- step: long (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- anglez: float (nullable = true)
 |-- enmo: float (nullable = true)
 |-- event: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [50]:
# droping columns
df_parquet_modified = df_parquet.drop("timestamp").drop("series_id") # droping 2 columns

# modify enmo from float to sring type
df_parquet_modified = df_parquet_modified.withColumn("enmo", col("enmo").cast(StringType())) # modify enmo int data type to string
df_parquet_modified.show()  # show dataframe


+----+--------+------+-----+-----------------+
|step|  anglez|  enmo|event|__index_level_0__|
+----+--------+------+-----+-----------------+
|   0|-30.8453|0.0447| NULL|                0|
|   1|-34.1818|0.0443| NULL|                1|
|   2|-33.8771|0.0483| NULL|                2|
|   3|-34.2821| 0.068| NULL|                3|
|   4|-34.3858|0.0768| NULL|                4|
|   5|-34.9256|0.0511| NULL|                5|
|   6|-30.5134|0.1073| NULL|                6|
|   7|-30.5094|0.0649| NULL|                7|
|   8|-32.8806|0.0485| NULL|                8|
|   9| -34.675|0.0462| NULL|                9|
|  10|-32.3365|0.0797| NULL|               10|
|  11|-31.3002|0.0719| NULL|               11|
|  12|-29.0589|0.0588| NULL|               12|
|  13|-28.2669|0.0581| NULL|               13|
|  14|-29.6412| 0.047| NULL|               14|
|  15|-24.5447|0.0401| NULL|               15|
|  16|-27.4409|0.0452| NULL|               16|
|  17|-26.9326|  0.04| NULL|               17|
|  18|-27.278

In [51]:
# Describe the DataFrame
df_parquet_modified.printSchema() # print schema

root
 |-- step: long (nullable = true)
 |-- anglez: float (nullable = true)
 |-- enmo: string (nullable = true)
 |-- event: string (nullable = true)
 |-- __index_level_0__: long (nullable = true)



In [22]:
# JSON data as provided (corrected format)
json_data = [
    '{"name" : "john doe", "dob" : "01-01-1970", "phone" : "+234567890", "salary" : 57000, "location" : "New York", "sex" : "male"}',
    '{"name" : "john adam", "dob" : "02-01-1990", "phone" : "+6634567890", "salary" : 69000, "location" : "Los Angeles", "sex" : "male"}',
    '{"name" : "jane frank", "dob" : "01-11-1999", "phone" : "+9876543210", "salary" : 70000, "location" : "Chicago", "sex" : "female"}',
    '{"name" : "alice hillary", "dob" : "01-12-1975","phone" :"+4534567890", "salary" : 80000, "location" : "Houston", "sex" : "female"}',
    '{"name" : "bob kim", "dob" : "01-01-1985", "phone" : "+5675555555", "salary" : 10000, "location" : "Phoenix", "sex" : "male"}',
    '{"name" : "felister malito", "dob" : "01-01-1995", "phone" : "+9234567890", "salary" : 65000, "location" : "San Francisco", "sex" : "male"}',
    '{"name" : "davison steve", "dob" : "05-01-1970", "phone" : "+299834444", "salary" : 75000, "location" : "Miami", "sex" : "male"}',
    '{"name" : "emmanuel pinner", "dob" : "06-01-1982", "phone" : "+541293333", "salary" : 82000, "location" : "Boston", "sex" : "female"}',
    '{"name" : "frank omega", "dob" : "01-08-1978", "phone" : "+5673885674", "salary" : 78000, "location" : "Denver", "sex" : "male"}',
    '{"name" : "gaddiom tom", "dob" : "01-09-1992", "phone" : "+6754377779", "salary" : 97000, "location" : "Seattle", "sex" : "female"}'
]

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json, struct, lit

# Initialize Spark session
spark = SparkSession.builder \
    .appName("JSON") \
    .getOrCreate()



In [23]:
# Convert list to RDD

rdd = spark.sparkContext.parallelize(json_data) # rdd 

In [24]:
# Define schema for JSON
from pyspark.sql.types import StructType, StructField, StringType,IntegerType,DateType # importing packages
schema = StructType([
    StructField("name", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("salary", IntegerType(), True),
    StructField("location", StringType(), True),
    StructField("sex", StringType(), True)
])

In [25]:
# Convert RDD to DataFrame
rdddf = spark.read.json(rdd, schema=schema)


# Create a nested structure
from pyspark.sql.functions import struct
# Restructure DataFrame using struct function and to_json
df_restructured = rdddf.select(to_json(struct(
    struct(col("name"), col("dob"), col("phone")).alias("personal_data")
)).alias("json_string"))

In [26]:
df_restructured.show(truncate=False)

+-------------------------------------------------------------------------------------+
|json_string                                                                          |
+-------------------------------------------------------------------------------------+
|{"personal_data":{"name":"john doe","dob":"01-01-1970","phone":"+234567890"}}        |
|{"personal_data":{"name":"john adam","dob":"02-01-1990","phone":"+6634567890"}}      |
|{"personal_data":{"name":"jane frank","dob":"01-11-1999","phone":"+9876543210"}}     |
|{"personal_data":{"name":"alice hillary","dob":"01-12-1975","phone":"+4534567890"}}  |
|{"personal_data":{"name":"bob kim","dob":"01-01-1985","phone":"+5675555555"}}        |
|{"personal_data":{"name":"felister malito","dob":"01-01-1995","phone":"+9234567890"}}|
|{"personal_data":{"name":"davison steve","dob":"05-01-1970","phone":"+299834444"}}   |
|{"personal_data":{"name":"emmanuel pinner","dob":"06-01-1982","phone":"+541293333"}} |
|{"personal_data":{"name":"frank

In [27]:
# Define output path for JSON file
output_path = "personal_data.json"

# Write DataFrame to JSON file
df_restructured.write.json(output_path, mode="overwrite")