In [17]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder .appName("WordCountApp") .getOrCreate()



In [18]:
import os
print(os.getcwd())


c:\Users\neeli\spark


In [19]:
rdd = spark.sparkContext.textFile("sample.txt")


In [20]:
import os
print("sample.txt exists:", os.path.isfile("sample.txt"))


sample.txt exists: True


In [21]:
print(os.listdir())


['.ipynb_checkpoints', 'data.csv', 'output.avro', 'output.json', 'output.parquet', 'output.txt', 'sample.csv', 'sample.json', 'sample.txt', 'spark.ipynb', 'spark310env', 'venv', 'wordcount.py']


In [22]:
rdd = spark.sparkContext.textFile("C:/Users/neeli/spark/sample.txt")


In [23]:

word_counts = rdd.flatMap(lambda line: line.split()).map(lambda word: (word.lower(), 1)).reduceByKey(lambda a, b: a + b)


for word, count in word_counts.collect():
    print(f"{word}: {count}")


IllegalArgumentException: Pathname /C:/Users/neeli/spark/sample.txt from hdfs://localhost:9000/C:/Users/neeli/spark/sample.txt is not a valid DFS filename.

In [11]:
rdd = spark.sparkContext.textFile("sample.csv")

In [12]:
word_counts = rdd.flatMap(lambda line: line.split()) .map(lambda word: (word.strip().lower(), 1)) .reduceByKey(lambda a, b: a + b)

for word, count in word_counts.collect():
    print(f"{word}: {count}")

world: 1
is: 1
hello: 2
spark: 2
fast: 1


In [13]:
df = spark.read.json("sample.json")

In [14]:
df = spark.read.json("sample.json")

if df.count() > 0:
    if 'text' in df.columns:
        rdd = df.filter(df['text'].isNotNull()).rdd.flatMap(lambda row: str(row['text']).split()) .map(lambda word: (word.lower().strip(), 1)) .reduceByKey(lambda a, b: a + b)
        
        
        for word, count in rdd.collect():
            print(f"{word}: {count}")
    else:
        print("The 'text' column does not exist in the DataFrame.")
else:                  
    print("The DataFrame is empty!")


hello: 2
world: 1
spark: 2
is: 1
fast: 1


In [15]:
spark

In [2]:
import pandas as pd
df = pd.read_csv('data.csv')
print(df.head())


     hello world
0    hello spark
1  spark is fast


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
spark_df.show()


+-------------+
|  hello world|
+-------------+
|  hello spark|
|spark is fast|
+-------------+



In [None]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from fastavro import writer, parse_schema


df = pd.read_csv("data.csv")


df.to_json("output.json", orient="records", lines=True)

table = pa.Table.from_pandas(df)
pq.write_table(table, "output.parquet")


df.to_csv("output.txt", index=False, header=False)


avro_schema = {
    "type": "record",
    "name": "Record",
    "fields": [{"name": col, "type": "string"} for col in df.columns]
}
records = df.astype(str).to_dict(orient="records")
parsed_schema = parse_schema(avro_schema)

with open("output.avro", "wb") as f:
    writer(f, parsed_schema, records)

print(" CSV successfully written to JSON, Parquet, Avro, and Text!")


✅ CSV successfully written to JSON, Parquet, Avro, and Text!


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("CSV to JSON, AVRO, TEXT") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.3.0") \
    .getOrCreate()


In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Read CSV from HDFS") .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000")  .getOrCreate()
csv_path = "hdfs://localhost:9000/user/hadoop/inputdata/data.csv"
df = spark.read.option("header", "true").csv(csv_path)
df.show()


+-------------+
|  hello world|
+-------------+
|  hello spark|
|spark is fast|
+-------------+



In [None]:
df.write.mode("overwrite").json("hdfs://localhost:9000/user/hadoop/fileoutput/json_output")
from pyspark.sql.functions import concat_ws
df_text = df.select(concat_ws(", ", *df.columns).alias("value"))
df_text.write.mode("overwrite").text("hdfs://localhost:9000/user/hadoop/fileoutput/text_output")


In [28]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read covid csv from HDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

csv_path = "hdfs://localhost:9000/user/hadoop/covid/country_wise_latest (1).csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_path)

df.createOrReplaceTempView("covid_data")

result = spark.sql("""
    SELECT `Country/Region`, Confirmed, Deaths, Recovered, Active
    FROM covid_data
    where `Country/Region` = 'India'
""")


result.show()


+--------------+---------+------+---------+------+
|Country/Region|Confirmed|Deaths|Recovered|Active|
+--------------+---------+------+---------+------+
|         India|  1480073| 33408|   951166|495499|
+--------------+---------+------+---------+------+



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Top 10 Countries by COVID-19 Recovery Rates") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

csv_path = "hdfs://localhost:9000/user/hadoop/covid/full_grouped.csv"

df = spark.read.option("header", "true").option("inferSchema", "true").csv(csv_path)

df.createOrReplaceTempView("full_grouped_data")

result = spark.sql("""
    SELECT `Country/Region`, 
           MAX(Confirmed) AS Total_Confirmed,
           MAX(Recovered) AS Total_Recovered,
           ROUND(MAX(Recovered) / MAX(Confirmed) * 100, 2) AS Recovery_Rate_Percent
    FROM full_grouped_data
    WHERE Confirmed > 0
    GROUP BY `Country/Region`
    ORDER BY Recovery_Rate_Percent DESC
    LIMIT 10
""")

result.show()


+--------------+---------------+---------------+---------------------+
|Country/Region|Total_Confirmed|Total_Recovered|Recovery_Rate_Percent|
+--------------+---------------+---------------+---------------------+
|      Dominica|             18|             18|                100.0|
|       Grenada|             23|             23|                100.0|
|      Holy See|             12|             12|                100.0|
|      Djibouti|           5059|           4977|                98.38|
|       Iceland|           1854|           1823|                98.33|
|        Brunei|            141|            138|                97.87|
|   New Zealand|           1557|           1514|                97.24|
|         Qatar|         109597|         106328|                97.02|
|      Malaysia|           8904|           8601|                 96.6|
| Liechtenstein|             86|             83|                96.51|
+--------------+---------------+---------------+---------------------+



In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Read COVID CSV Files from HDFS") \
    .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9000") \
    .getOrCreate()

df_country_latest = spark.read.option("header", "true").option("inferSchema", "true")  .csv("hdfs://localhost:9000/user/hadoop/covid/country_wise_latest (1).csv")

df_covid_clean = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://localhost:9000/user/hadoop/covid/covid_19_clean_complete.csv")

df_day_wise = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://localhost:9000/user/hadoop/covid/day_wise.csv")

df_full_grouped = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://localhost:9000/user/hadoop/covid/full_grouped.csv")

df_usa_county = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://localhost:9000/user/hadoop/covid/usa_county_wise.csv")

df_worldometer = spark.read.option("header", "true").option("inferSchema", "true") .csv("hdfs://localhost:9000/user/hadoop/covid/worldometer_data.csv")

df_country_latest.show()
df_covid_clean.show()
df_day_wise.show()
df_full_grouped.show()
df_usa_county.show()
df_worldometer.show()


+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

In [None]:
df_country_latest.createOrReplaceTempView("country_latest")

result = spark.sql("""
    SELECT `Country/Region`, 
           ROUND(`Recovered / 100 Cases`, 2) AS Recovery_Rate,
           ROUND(`Deaths / 100 Cases`, 2) AS Fatality_Rate
    FROM country_latest
    WHERE `Confirmed` > 0
    ORDER BY Recovery_Rate DESC
""")

result.show()

+--------------+-------------+-------------+
|Country/Region|Recovery_Rate|Fatality_Rate|
+--------------+-------------+-------------+
|      Dominica|        100.0|          0.0|
|       Grenada|        100.0|          0.0|
|      Holy See|        100.0|          0.0|
|      Djibouti|        98.38|         1.15|
|       Iceland|        98.33|         0.54|
|        Brunei|        97.87|         2.13|
|   New Zealand|        97.24|         1.41|
|         Qatar|        97.02|         0.15|
|      Malaysia|         96.6|         1.39|
|     Mauritius|        96.51|         2.91|
|        Norway|        95.84|         2.79|
|       Taiwan*|        95.24|         1.52|
|          Laos|         95.0|          0.0|
|         Malta|        94.86|         1.28|
|       Estonia|        94.54|         3.39|
|      Thailand|        94.36|         1.76|
| Liechtenstein|        94.19|         1.16|
|    San Marino|        93.99|         6.01|
|       Finland|        93.54|         4.45|
|     Gree

In [43]:
df_country_latest.createOrReplaceTempView("country_latest")

result=spark.sql("""
SELECT `Country/Region`,Deaths
from country_latest
ORDER BY Deaths 
                 
""")

result.show()


+--------------------+------+
|      Country/Region|Deaths|
+--------------------+------+
|              Bhutan|     0|
|            Cambodia|     0|
|            Dominica|     0|
|             Eritrea|     0|
|                Fiji|     0|
|           Greenland|     0|
|             Grenada|     0|
|            Holy See|     0|
|                Laos|     0|
|            Mongolia|     0|
|    Papua New Guinea|     0|
|Saint Kitts and N...|     0|
|         Saint Lucia|     0|
|Saint Vincent and...|     0|
|          Seychelles|     0|
|         Timor-Leste|     0|
|             Vietnam|     0|
|             Burundi|     1|
|       Liechtenstein|     1|
|      Western Sahara|     1|
+--------------------+------+
only showing top 20 rows



In [44]:
df_worldometer.createOrReplaceTempView("country_latest")

result = spark.sql("""
   SELECT `Country/Region`,TotalCases
from country_latest
ORDER BY TotalCases desc
""")

result.show()

+--------------+----------+
|Country/Region|TotalCases|
+--------------+----------+
|           USA|   5032179|
|        Brazil|   2917562|
|         India|   2025409|
|        Russia|    871894|
|  South Africa|    538184|
|        Mexico|    462690|
|          Peru|    455409|
|         Chile|    366671|
|      Colombia|    357710|
|         Spain|    354530|
|          Iran|    320117|
|            UK|    308134|
|  Saudi Arabia|    284226|
|      Pakistan|    281863|
|    Bangladesh|    249651|
|         Italy|    249204|
|        Turkey|    237265|
|     Argentina|    228195|
|       Germany|    215210|
|        France|    195633|
+--------------+----------+
only showing top 20 rows



In [51]:
df_worldometer.createOrReplaceTempView("country_latest")

result = spark.sql("""
    SELECT 
        ROUND((SUM(TotalRecovered) / SUM(TotalCases)) * 100, 2) AS Global_Recovery_Rate
    FROM country_latest
    WHERE TotalCases > 0
""")
result.show()



+--------------------+
|Global_Recovery_Rate|
+--------------------+
|               62.97|
+--------------------+



In [56]:
df_worldometer.createOrReplaceTempView("country_latest")

result = spark.sql("""
    SELECT 
        Continent,
        SUM(TotalCases) AS Total_Confirmed,
        SUM(TotalDeaths) AS Total_Deaths,
        SUM(TotalRecovered) AS Total_Recovered,
        SUM(ActiveCases) AS Total_Active,
        ROUND((SUM(TotalRecovered) / SUM(TotalCases)) * 100, 2) AS Recovery_Rate,
        ROUND((SUM(TotalDeaths) / SUM(TotalCases)) * 100, 2) AS Fatality_Rate
    FROM country_latest
    WHERE TotalCases > 0
    GROUP BY Continent
    ORDER BY Total_Confirmed DESC
""")

result.show()


+-----------------+---------------+------------+---------------+------------+-------------+-------------+
|        Continent|Total_Confirmed|Total_Deaths|Total_Recovered|Total_Active|Recovery_Rate|Fatality_Rate|
+-----------------+---------------+------------+---------------+------------+-------------+-------------+
|    North America|        5919209|      229855|        3151678|     2537676|        53.24|         3.88|
|             Asia|        4689794|      100627|        3508170|     1080997|         74.8|         2.15|
|    South America|        4543273|      154885|        3116150|     1272238|        68.59|         3.41|
|           Europe|        2982576|      205232|        1587302|      475261|        53.22|         6.88|
|           Africa|        1011867|       22114|         693620|      296133|        68.55|         2.19|
|Australia/Oceania|          21735|         281|          12620|        8834|        58.06|         1.29|
|             NULL|            712|          1

In [60]:
df_worldometer.createOrReplaceTempView("country_latest")

result = spark.sql("""
    SELECT 
        Continent,
        SUM(TotalCases) AS Total_Confirmed
    FROM country_latest
    WHERE TotalCases > 0 AND Continent IS NOT NULL
    GROUP BY Continent
    ORDER BY Total_Confirmed ASC
    LIMIT 1
""")

result.show()


+-----------------+---------------+
|        Continent|Total_Confirmed|
+-----------------+---------------+
|Australia/Oceania|          21735|
+-----------------+---------------+



In [61]:
df_worldometer.createOrReplaceTempView("country_latest")

result = spark.sql("""
    SELECT 
        Continent,
        SUM(TotalRecovered) AS Total_Recovered,
        SUM(TotalCases) AS Total_Cases,
        ROUND((SUM(TotalRecovered) / SUM(TotalCases)) * 100, 2) AS Recovery_Rate
    FROM country_latest
    WHERE TotalCases > 0 AND Continent IS NOT NULL
    GROUP BY Continent
    ORDER BY Recovery_Rate DESC
""")

result.show()


+-----------------+---------------+-----------+-------------+
|        Continent|Total_Recovered|Total_Cases|Recovery_Rate|
+-----------------+---------------+-----------+-------------+
|             Asia|        3508170|    4689794|         74.8|
|    South America|        3116150|    4543273|        68.59|
|           Africa|         693620|    1011867|        68.55|
|Australia/Oceania|          12620|      21735|        58.06|
|    North America|        3151678|    5919209|        53.24|
|           Europe|        1587302|    2982576|        53.22|
+-----------------+---------------+-----------+-------------+



In [None]:
df_country_latest.createOrReplaceTempView("country_latest")
df_worldometer.createOrReplaceTempView("worldometer")
result = spark.sql("""
    WITH joined_data AS (
        SELECT 
            cl.`Country/Region` AS Country,
            wm.TotalCases,
            wm.TotalDeaths
        FROM country_latest cl
        JOIN worldometer wm
        ON cl.`Country/Region` = wm.`Country/Region`
        WHERE wm.TotalCases > 0 AND wm.TotalDeaths IS NOT NULL
    )

    SELECT 
        Country,
        TotalCases,
        TotalDeaths,
        ROUND((TotalDeaths / TotalCases) * 100, 2) AS Local_Death_Percentage,
        ROUND((SUM(TotalDeaths) OVER () / SUM(TotalCases) OVER ()) * 100, 2) AS Global_Death_Percentage
    FROM joined_data
    ORDER BY Local_Death_Percentage DESC
""")

result.show(10)

+--------------+----------+-----------+----------------------+-----------------------+
|       Country|TotalCases|TotalDeaths|Local_Death_Percentage|Global_Death_Percentage|
+--------------+----------+-----------+----------------------+-----------------------+
|         Yemen|      1768|        508|                 28.73|                   3.67|
|        France|    195633|      30312|                 15.49|                   3.67|
|         Italy|    249204|      35187|                 14.12|                   3.67|
|       Belgium|     71158|       9859|                 13.86|                   3.67|
|       Hungary|      4597|        600|                 13.05|                   3.67|
|        Mexico|    462690|      50517|                 10.92|                   3.67|
|   Netherlands|     56982|       6153|                  10.8|                   3.67|
|Western Sahara|        10|          1|                  10.0|                   3.67|
|          Chad|       942|         76|    