In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("practise").getOrCreate()

In [None]:
# data = spark.read.option('header','True').csv('../Data/netflix_titles.csv',inferSchema=True)
data = spark.read.csv("books.csv", inferSchema=True, header=True)
# data.printSchema(5)
data.columns

In [None]:
data.select(["book_id", "title"]).show()

In [None]:
new_row_1 = (17, "Imaginary", "unknown", "unknown", 2022, 100, None)
new_row_2 = (18, "Impossible", None, None, 2022, 50, None)

# Append the new row to the DataFrame
df = data.union(spark.createDataFrame([new_row_1, new_row_2], schema=data.schema))

# Display the DataFrame
df.show()

In [None]:
df = df.withColumnRenamed("book_id", "id")
df = df.withColumn("new_stock", df["stock_quantity"] + 100)
df.show()

In [None]:
df = df.drop("new_stock")
df.show()

In [None]:
# droping nan values
# df.na.drop (how='any',subset=['author_fname']).show()
# filling nan values
df.na.fill("Mising Values").show()

In [None]:
# df.filter((df['stock_quantity']<=100) & (df['released_year']>2000)).select(['title','stock_quantity','released_year']).show()
# df.groupBy('released_year').avg().show()

In [None]:
# Query to get 5 books with the largest page count
"""
SELECT book_id, title, pages FROM books
ORDER BY pages DESC
LIMIT 5;
"""

result = df.orderBy(df.pages.desc()).select(["id", "title", "pages"]).head(5)
spark.createDataFrame(result).show()

In [None]:
# Most recent 5 published books with page count more than 100
"""
SELECT title, released_year, pages FROM books
WHERE pages>100
ORDER BY released_year DESC
LIMIT 5;
"""
result = (
    df.filter(df.pages > 100)
    .orderBy(df.released_year.desc())
    .select(["title", "released_year", "pages"])
    .head(5)
)
spark.createDataFrame(result).show()

In [None]:
# group each authors work and count their total number of pages published.
"""
SELECT author_fname, author_lname, SUM(pages) AS total_pages FROM books
GROUP BY author_fname, author_lname
ORDER BY total_pages DESC;
"""
from pyspark.sql import functions as F

# Group by authors and calculate the total number of pages published
result_df = (
    df.groupBy("author_fname", "author_lname")
    .agg(F.sum("pages").alias("total_pages"))
    .orderBy("total_pages", ascending=False)
)
# Display the result DataFrame
result_df.show()

In [None]:
# Filter the DataFrame based on the condition
result_df = df.filter(
    (F.length("title") - F.length(F.regexp_replace("title", " ", "")) + 1) > 2
).select("id", "title")

# Show the result
result_df.show()

In [None]:
spark.stop()

In [None]:
import os

from dotenv import load_dotenv
from pyspark.sql import SparkSession

# Load environment variables from .env file
load_dotenv()

# Get environment variables
DATABASE_URL = os.getenv("JDBC_URL")
USER = os.getenv("USER")
PASSWORD = os.getenv("PASSWORD")

# Create a Spark session
spark = SparkSession.builder.appName("MyApp").getOrCreate()


# Define connection properties
properties = {"user": USER, "password": PASSWORD, "driver": "com.mysql.cj.jdbc.Driver"}

service = "amazon"
# Specify the table name
table_name = "(SELECT type FROM {service}) AS newTable"

# Read data from the MySQL table into a DataFrame
dataFrame = spark.read.jdbc(url=DATABASE_URL, table=table_name, properties=properties)

In [None]:
service = "netflix"
query = f"(SELECT show_id,listed_in FROM {service}) AS newTable"
df_genres = spark.read.jdbc(
    url=DATABASE_URL,
    table=query,
    properties=properties,
)
df_genres.show()

In [None]:
from pyspark.sql import functions as F

# Split genres, count occurrences, and create DataFrame
genre_counts = (
    df_genres.select(F.explode(F.split(F.col("listed_in"), ", ")).alias("genre"))
    .groupBy("genre")
    .count()
    .sort("count", ascending=False)
)

genre_counts.show()

In [None]:
service = "netflix"
query = f"(SELECT country FROM {service}) AS newTable"

country_df = spark.read.jdbc(
    url=DATABASE_URL,
    table=query,
    properties=properties,
)
country_df = country_df.filter(F.col("country") != "unknown")
# Split the 'country' column, explode, and count occurrences

top_countries = (
    country_df.withColumn("country", F.explode(F.split("country", ", ")))
    .groupBy("country")
    .count()
    .orderBy(F.desc("count"))
    .limit(20)
)

top_countries.show()