### This is my First Notebook

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, BooleanType, DateType
from pyspark.sql.functions import col, date_format 

In [0]:
dbutils.secrets.listScopes()

In [0]:
dbutils.fs.mount(
    source='wasbs://moviesdataproject@moviesdatajcs.blob.core.windows.net',
    mount_point='/mnt/project-movies-data',
    extra_configs = {'fs.azure.account.key.moviesdatajcs.blob.core.windows.net':
                     dbutils.secrets.get('movieDataSecretScope', 'moviesStorageAccountKey')} 
)

In [0]:
%fs
ls "/mnt/project-movies-data"

In [0]:
action = spark.read.format("csv").load("/mnt/project-movies-data/raw data/action.csv")

In [0]:
action.show()

In [0]:
action = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/action.csv")

In [0]:
action = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/action.csv")
adventure = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/adventure.csv")
thriller = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/thriller.csv")
horror = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/horror.csv")
scifi = spark.read.format("csv").option("header", "true").load("/mnt/project-movies-data/raw data/scifi.csv")

In [0]:
action.printSchema()

In [0]:
action = action.withColumn("rating", col("rating").cast(IntegerType()))

In [0]:
action.printSchema()

In [0]:
action_movies_rated_highest = action.orderBy("rating", ascending = False).limit(20).show()

In [0]:
action_movies_rated_highest = action.orderBy("rating", ascending = False).select("movie_name", "genre", "rating").limit(15).show()

In [0]:
comedy_movies = action.filter(col("genre").contains("Comedy")).limit(15).show()

In [0]:
action.write.option("header", 'True').csv("/mnt/project-movies-data/transform data/action")

In [0]:
action.printSchema()
action = action.withColumn("year", col("year").cast(DateType()))

In [0]:
action = action.withColumn("year", date_format(col("year"), "yyyy").cast(IntegerType()))

action.show()

In [0]:
action_movies_rated_highest = action.orderBy("rating", ascending=False).select("movie_name", "genre", "rating").limit(15).show()

In [0]:
adventure = adventure.withColumn("rating", col("rating").cast(IntegerType()))
adventure_movies_rated_highest = adventure.orderBy("rating", ascending = False).select("movie_name", "genre", "rating").limit(20).show()

In [0]:
adventure.write.option("header", 'True').csv("/mnt/project-movies-data/transform data/adventure_top_20")

In [0]:
thriller = thriller.withColumn("rating", col("rating").cast(IntegerType()))
thriller_movies_rated_highest = thriller.orderBy("rating", ascending = False).select("movie_name", "genre", "rating").limit(20).show()
thriller.write.option("header", 'True').csv("/mnt/project-movies-data/transform data/thriller_top_20")

horror = horror.withColumn("rating", col("rating").cast(IntegerType()))
horror_movies_rated_highest = horror.orderBy("rating", ascending = False).select("movie_name", "genre", "rating").limit(20).show()
horror.write.option("header", 'True').csv("/mnt/project-movies-data/transform data/horror_top_20")

In [0]:
scifi = scifi.withColumn("year", date_format(col("year"), "yyyy").cast(IntegerType()))

scifi.show()

In [0]:
scifi_newest_movies = scifi.orderBy("year", ascending = False).select("movie_name", "year", "rating").limit(20).show()

In [0]:
scifi.write.option("header", 'True').csv("/mnt/project-movies-data/transform data/scifi_newest_20")

###Part 3

##Spark DataFrames in Databricks are used by data engineers and analysts, but in slightly different ways! 

So think of a Spark DataFrame in Databricks like a fancy pansy spreadsheet. It organises data in rows and columns (like Excel), but it can handle massive datasets and is brilliant for your analysis. 

Spark DataFrames are used for a wide range of tasks involving distributed data processing and analysis on large datasets. Here are some of their key applications: 

###Data wrangling and transformation: 

- **Loading data from various sources:** DataFrames can read data from diverse sources like files (CSV, JSON, Parquet), databases, and existing Spark datasets. 

- **Cleaning and filtering data:** You can manipulate data using DataFrame operations like filtering rows based on conditions, handling missing values, and performing aggregations. 

- **Joining and merging data:** DataFrames allow you to join multiple datasets based on shared columns, creating more informative datasets for analysis. 

###Data analysis and exploration: 

- **Performing SQL-like queries:** You can use SQL syntax to query and analyse data within DataFrames, good if you are familiar with relational databases. 

- **Statistical analysis and machine learning:** DataFrames provide functions for calculating statistics, performing regressions, and building machine learning models on large datasets. 

- **Data visualisation:** DataFrames can be easily integrated with data visualization libraries like Matplotlib and ggplot2, enabling you to create insightful charts and graphs for exploring data trends and patterns. 

##More Queries in your Dataframe & Using Plotly 

In [0]:
# Read the CSV file as a Spark DataFrame 

action = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("/mnt/project-movies-data/raw data/action.csv") \
    .createOrReplaceTempView("temp_table") 

# Create a Spark table from the temporary view 
spark.sql("CREATE TABLE IF NOT EXISTS actiontb USING parquet AS SELECT * FROM temp_table") 

# Query for movies with rating = 8 
query_result = spark.sql("SELECT year, movie_name, rating FROM actiontb WHERE rating = 8") 

# Import Plotly Express for visualisation 
import plotly.express as px 

# Create a Pandas DataFrame for plotting 
pandas_df = query_result.toPandas() 

# Group by year, count movies, and create a DataFrame with "year" and "count" columns 
grouped_df = pandas_df.groupby("year").size().to_frame(name="count").reset_index() 

# Create the bar chart using Plotly (with color customisation) 
fig = px.bar(grouped_df, x="year", y="count", color="count", color_continuous_scale="Viridis") 

# Set plot size 
fig.update_layout(width=900, height=600) 

# Display the plot 
fig.show()