In [25]:
import findspark
findspark.init()

In [26]:
import pyspark # only run after findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.app.getOrCreate()
spark

AttributeError: 'Builder' object has no attribute 'app'

**Load Data**<br>
Download the Kaggle dataset: [Popular Movies Datasets](https://www.kaggle.com/datasets/whenamancodes/popular-movies-datasets-9000-movies)

In [None]:
movies= spark.read.csv('../Popular Movies Datasets/movies.csv',header=True)
links= spark.read.csv('../Popular Movies Datasets/links.csv',sep=',',header=True)



In [None]:
tags=spark.read.format('csv').option('delimiter',',').option('header','true')\
                .load('../Popular Movies Datasets/tags.csv')

ratings=spark.read.load('../Popular Movies Datasets/ratings.csv',format='csv',header='true')

**Inspect Data**

In [None]:


# Displays the content of dataframe
# movies.show(truncate=False)

# Return first n rows
# movies.head(1)

# Return first n rows
# ratings.take(2)

# Returns first row
# tags.first()

# Computes summary statistics
# links.describe().show()

# Returns columns of dataframe
# links.columns

# Counts the number of rows in dataframe
# movies.count()

# Counts the number of distinct rows in dataframe
# movies.distinct().count()



In [None]:
tags.columns

**Commonly Used Functions** <br>
1.  Remove Duplicate Values

In [None]:
tags.count()
tags_dedup = tags.dropDuplicates(['userId', 'movieId']) 
tags_dedup.count()


2. select and selectExpr 

In [None]:
movies.select('movieId',movies.title).show(2)
movies.selectExpr('movieId','upper(title) as somecolname').show(2)

3. **withColumn and withColumnRenamed** : Adds a column and Renames a column respectively

In [None]:
from pyspark.sql.functions import col,split

movies_1=movies.withColumn('newtitle',split(col('title'),r'\(')[0])
# movies_1.show()
movies_1=movies_1.withColumnRenamed('newtitle','movie_name')
movies_1.show()



4.  If Else statements

In [None]:
from pyspark.sql.functions import when
ratings_1=ratings.select("rating", when(ratings.rating == 5, True).otherwise(False))
# ratings_1.show(2)
ratings_1=ratings_1.withColumnRenamed('CASE WHEN (rating = 5) THEN true ELSE false END','five_ratings')
ratings_1.show(2)

ratings.withColumn('five_ratings',when(ratings.rating == 5, True).otherwise(False)).show(2)


**Droping Columns**

In [None]:
ratings.drop('userId','timestamp').show(2)
ratings.drop('userId').drop(col('timestamp')).drop(ratings.movieId).show(2)


**Filtering Rows**

In [None]:
ratings.filter(col('rating')>=4).show(10)

**Repartitioning**

In [None]:
movies1=movies.repartition(200)
print(movies1.rdd.getNumPartitions())

movies2=movies1.coalesce(3)
print(movies2.rdd.getNumPartitions())

**Spark SQL**

In [None]:
tags.createOrReplaceTempView('tags')
df_tags=spark.sql('select * from tags')
df_tags.show(2)

spark.sql("""select userId,count(*) as counts
                from tags 
            group by userId order by counts desc""").show(5)


**String Operations**

In [None]:
movies.filter(col('title').isin(['Toy Story (1995)','Money Train (1995)' ])).show()
movies.filter(col('genres').like('%Comedy%')).show(3)
movies.filter(col('title').startswith('The')).show(3)
movies.filter(col('title').endswith("(1995)")).show(3)

**Join Operation** <br>
Find userId with highest number of rating submission.

In [None]:
from pyspark.sql.functions import col,desc
movies.join(ratings,on='movieId',how='left').select('userId').groupBy('userId').count()\
            .orderBy((col('count').desc())).show()

**Write to Disk**

In [None]:
movies.write.csv('../output/movies',header=True,mode='overwrite')
tags.write.save('../output/tags',format='csv',header=True,mode='overwrite',compression='gzip')
links.write.format('csv').option('header','true').mode('overwrite').save('../output/links')