In [1]:
import time
start_time = time.time()

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, DoubleType, DateType
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import CountVectorizer
from pyspark.sql import functions as F

import pandas as pd
import re

sc = SparkContext(master='local', appName='ETL_Job')
spark = SparkSession(sparkContext=sc)

In [2]:
df = spark.read.format('json').option('inferSchema', 'true').load(r'source_files/movies.json')

selected_columns = ['_id', 'title', 'genres', 'plot', 'cast', 'directors']

df = df.select(selected_columns)

schema = StructType([
    StructField("_id", StructType([StructField("$oid", StringType())]), True),
    StructField("title", StringType(), True),
    StructField("genres", ArrayType(StringType()), True),
    StructField("plot", StringType(), True),
    StructField("cast", ArrayType(StringType()), True),
    StructField("directors", ArrayType(StringType()), True)])
df.to(schema=schema)

DataFrame[_id: struct<$oid:string>, title: string, genres: array<string>, plot: string, cast: array<string>, directors: array<string>]

In [3]:
@udf(ArrayType(StringType()))
def get_list(x):
    if isinstance(x, list):
        names = [name for name in x if name]  # Remove None values
        if len(names) > 5:
            names = names[:5]
        return names
    return []

featured_columns = ['genres', 'cast', 'directors']

for column in featured_columns:
    df = df.withColumn(column, get_list(col(column)))

In [4]:
@udf(ArrayType(StringType()))
def clean_data(row):
    if isinstance(row, list):
        return [re.sub(r'[^a-zA-Z0-9]', '', i).lower() for i in row]
    elif isinstance(row, str):
        return [re.sub(r'[^a-zA-Z0-9]', '', row).lower()]
    return []

for column in featured_columns:
    df = df.withColumn(column, clean_data(col(column)))

In [5]:
@udf(ArrayType(StringType()))
def merge_cols(genres, cast, directors):
    return list(genres + cast + directors)

df = df.withColumn('tokens', merge_cols('genres', 'cast', 'directors'))

df_2 = df.select('title', 'tokens')

In [7]:
start_time = time.time()
# Use CountVectorizer to convert text data to a feature vector
cv = CountVectorizer(inputCol="tokens", outputCol="features")
cv_model = cv.fit(df_2)
count_matrix = cv_model.transform(df_2)

# Calculate cosine similarity
dot_udf = F.udf(lambda x, y: float(x.dot(y)), 'double')
cosine_sim = count_matrix.alias("i").join(count_matrix.alias("j"), F.col("i.title") < F.col("j.title")) \
    .select(
        F.col("i.title").alias("title_i"),
        F.col("j.title").alias("title_j"),
        dot_udf("i.features", "j.features").alias("cosine_similarity")
    )
print(cosine_sim.count())
print(time.time() - start_time)

277028195
19.88704562187195


In [8]:
cosine_sim.count()

277028195

In [3]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext(master='local', appName='test')
spark = SparkSession(sparkContext=sc)

In [19]:
df1 = spark.read.format('json').option('inferSchema', 'true').load(r'source_files/movies(mongodb).json')
# df2 = spark.read.format('csv').option('inferSchema', 'true').load(r'C:/Users/mohammbaig/Documents/My files/Python/Mini_projects/Movie_recommender/tmdb_5000_movies.csv')
df2 = spark.read.csv(r"C:/Users/mohammbaig/Documents/My files/Python/Mini_projects/Movie_recommender/tmdb_5000_movies.csv", header=True, multiLine=True, escape='"', inferSchema=True)

In [20]:
df1.select('title').printSchema()

root
 |-- title: string (nullable = true)



In [21]:
df2.select('title').printSchema()

root
 |-- title: string (nullable = true)



In [22]:
df1.alias("df1")
df2.alias("df2")

DataFrame[budget: int, genres: string, homepage: string, id: int, keywords: string, original_language: string, original_title: string, overview: string, popularity: double, production_companies: string, production_countries: string, release_date: date, revenue: bigint, runtime: double, spoken_languages: string, status: string, tagline: string, title: string, vote_average: double, vote_count: int]

In [13]:
join=df1.join(df2, on=(df1['title'] == df2['title']), how='inner')

In [23]:
print(df1.count(),
      df1.distinct().count(),
      df1.select('title').count(),
      df1.select('title').distinct().count())
df1 = df1.dropDuplicates(['title'])
print(df1.count(),
      df1.distinct().count(),
      df1.select('title').count(),
      df1.select('title').distinct().count())

21349 21349 21349 19744
19744 19744 19744 19744


In [24]:
print(df2.count(),
      df2.distinct().count(),
      df2.select('title').count(),
      df2.select('title').distinct().count())
df2 = df2.dropDuplicates(['title'])
print(df2.count(),
      df2.distinct().count(),
      df2.select('title').count(),
      df2.select('title').distinct().count())

4803 4803 4803 4800
4800 4800 4800 4800


In [28]:
df1.createOrReplaceTempView('df1')
df2.createOrReplaceTempView('df2')
join2 = spark.sql(
    '''
    select df2.*, df1.poster from df1 inner join df2 on df1.title = df2.title;
    '''
)
print(join2.count())

# pdf = join2.toPandas()
# pdf.to_json('movies.json', orient='records')


pdf = join2.toPandas()
pdf['title'].to_csv('titles.txt', index=False, header=False, sep='\n')

3610


In [9]:
from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext(master='local', appName='test')
spark = SparkSession(sc)

# rdd = sc.parallelize([1,2,3])
# rdd.collect()

df = spark.createDataFrame([{"name": "John", "age": 30}])
df.rdd.getNumPartitions()

1

In [8]:
sc.stop()

In [2]:
df.write.csv("newfolder", header=True, mode="overwrite")

In [3]:
from pymongo import MongoClient
import os
import re

connection = MongoClient(os.getenv("MONGO_URI"))
db = connection["Recommendations_project"]
collection = db.get_collection("source_movies")

regex_pattern = re.compile("the", re.IGNORECASE)
query = {
            "$or": [
                {"title": {"$regex": regex_pattern}}
                # {"cast": {"$regex": regex_pattern}},
                # {"genres": {"$regex": regex_pattern}},
                # {"plot": {"$regex": regex_pattern}},
                # {"directors": {"$regex": regex_pattern}},
            ]
        }
# offset = page * self.page_size

# logging.info("searching movies for input %s", query_str)
cursor = collection.find(query).limit(10)
list(cursor)

[]