In [0]:
#Import Spark Session - Encompasses SparkContext & SQLContext 
from pyspark.sql import SparkSession
#SQL functions
from pyspark.sql import functions as fx
#Get the raw data ----> I have used Databricks Filestore to store my files
sourcepath_principle = "dbfs:/FileStore/tables/title_principals.tsv"
sourcepath_Basics = "dbfs:/FileStore/tables/name_basics.tsv"

#read the tsv using the spark read function and store it into the dataframe
LoadingTheFile_principle = spark.read.csv(sourcepath_principle,sep = "\t",header = True,inferSchema = True)
LoadingTheFile_basic = spark.read.csv(sourcepath_Basics,sep = "\t",header = True,inferSchema = True)

#Get the required parameter and value from both title and name dataframe
sel_LoadingTheFile_principle=LoadingTheFile_principle.select(fx.col("tconst"),fx.col("nconst"),fx.col("category"))
sel_LoadingTheFile_basic=LoadingTheFile_basic.select(fx.col("nconst"),fx.col("primaryName"))

#join both the data frame using the nconst
joinedF = sel_LoadingTheFile_principle.join(sel_LoadingTheFile_basic , ["nconst"])

#get only actor and director data from the resltset
join_act_dir = joinedF.where((fx.col("category") == fx.lit("actor")) | (fx.col("category") == fx.lit("director"))) 

#fetching only actor data from the category 
df_with_actor = join_act_dir.withColumn("Actor",(fx.when(fx.col("category") == fx.lit("actor") , fx.col("category")))).where(fx.col("Actor").isNotNull()).withColumnRenamed("primaryName","actorName")

#fetching only director data from the category
df_with_dir = join_act_dir.withColumn("Director",(fx.when(fx.col("category") == fx.lit("director") , fx.col("category")))).where(fx.col("Director").isNotNull()).withColumnRenamed("primaryName","directorName")

#joining all the dataframes using the tconst
joinedADd = join_act_dir.join(df_with_actor,['tconst'] , 'left').join(df_with_dir,['tconst'] , 'left')

#selecting the required parameters 
club_act=joinedADd.where((fx.col("actorName").isNotNull()) & (fx.col("directorName").isNotNull())).select(fx.col("tconst"),fx.col("actorName"),fx.col("directorName"))

#fetching the count of collabration between actor and director using groupBy and aggregate function
finalResult = club_act.groupBy(fx.col("actorName"),fx.col("directorName")).agg(fx.count("directorName").alias("count"))

#Order by high to low 
final =finalResult.orderBy(fx.col("count").desc())

#show top ten collabrations
finalSet = final.head(10)
display(finalSet)

actorName,directorName,count
Luis Eduardo Motoa,Luizi Agudelo,14449
Luis Eduardo Motoa,Roberto Reyes,14449
Luis Eduardo Motoa,Noé Salazar,14449
Dilip Joshi,Harshad Joshi,14411
Sebastian Hofmeyr,Henry Mylne,13263
Sebastian Hofmeyr,Gert van Niekerk,13263
Dilip Joshi,Dharmessh Mehta,12747
Jef Desmedt,Frank Tulkens,10522
Peter Hobbs,Gloria Monty,10011
Ravi Kiran,Narayana Rao Dasari,9756


In [0]:
club_act.registerTempTable("join_table")
a = spark.sql("select actorName,directorName,count(directorName) from join_table group by actorName,directorName having directorName = 'Narayana Rao Dasari'")
display(a)

actorName,directorName,count(directorName)
Raogopalrao,Narayana Rao Dasari,19
Sanjeev Kumar,Narayana Rao Dasari,7
Pran,Narayana Rao Dasari,10
Rama Rao Gokineni,Narayana Rao Dasari,10
Ramakrishna,Narayana Rao Dasari,13
Krishna Ghattamaneni,Narayana Rao Dasari,17
Ranganath,Narayana Rao Dasari,5
Mada,Narayana Rao Dasari,11
Murali Mohan,Narayana Rao Dasari,65
Giri Babu,Narayana Rao Dasari,12
