In [1]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 54.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=17f79a067f54116c1bee5c1b8ef67cb0b0d9b61390bb89e1b3d0245e1ce53c77
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [11]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
from pyspark.sql.functions import col, asc,desc

In [12]:
spark = SparkSession.builder.appName('PySpark-example1').getOrCreate()
sc=spark.sparkContext

In [13]:
# Create schema 
schema = StructType([ 
                     StructField("uID", IntegerType(), True), 
                     StructField("mID", IntegerType(), True), 
                     StructField("rating", IntegerType(), True), 
                     StructField("time", LongType(), True)])


In [14]:
# from csv files
df = spark.read.option("sep", "\t").schema(schema).csv("/content/u.data")
df.show()

+---+----+------+---------+
|uID| mID|rating|     time|
+---+----+------+---------+
|196| 242|     3|881250949|
|186| 302|     3|891717742|
| 22| 377|     1|878887116|
|244|  51|     2|880606923|
|166| 346|     1|886397596|
|298| 474|     4|884182806|
|115| 265|     2|881171488|
|253| 465|     5|891628467|
|305| 451|     3|886324817|
|  6|  86|     3|883603013|
| 62| 257|     2|879372434|
|286|1014|     5|879781125|
|200| 222|     5|876042340|
|210|  40|     3|891035994|
|224|  29|     3|888104457|
|303| 785|     3|879485318|
|122| 387|     5|879270459|
|194| 274|     2|879539794|
|291|1042|     4|874834944|
|234|1184|     2|892079237|
+---+----+------+---------+
only showing top 20 rows



In [17]:
topmovies = df.groupBy("mID").count().orderBy(col("count").desc())
topmovies.show(3)

+---+-----+
|mID|count|
+---+-----+
| 50|  583|
|258|  509|
|100|  508|
+---+-----+
only showing top 3 rows



In [26]:
df1= spark.read.option("sep", "|").csv("/content/u.item")
df1.show(3)

+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|_c0|              _c1|        _c2| _c3|                 _c4|_c5|_c6|_c7|_c8|_c9|_c10|_c11|_c12|_c13|_c14|_c15|_c16|_c17|_c18|_c19|_c20|_c21|_c22|_c23|
+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
|  1| Toy Story (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  0|  0|  1|  1|   1|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|
|  2| GoldenEye (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  1|  1|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
|  3|Four Rooms (1995)|01-Jan-1995|null|http://us.imdb.co...|  0|  0|  0|  0|  0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   0|   1|   0|   0|
+---+-----------------+-----------+----+--------------------+---+---+---+---+---+----+--

In [32]:
mvname={}
#df1.select(df1.columns[:2]).show()
for i in df1.collect():
  mvname[int(i[0])]=i[1]

In [33]:
name_dictionary = sc.broadcast(mvname)

In [34]:
movie_count=df.groupBy("mID").count()

In [35]:
from pyspark.sql import functions as func

In [36]:
# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
    return name_dictionary.value[movieID]

In [37]:
lookupNameUDF = func.udf(lookupName)

In [38]:
# Add a movieTitle column using our new udf
moviesWithNames = movie_count.withColumn("movieTitle", lookupNameUDF(func.col("mID")))

In [39]:
top_m_names= moviesWithNames.sort(col("count").desc())
top_m_names.show(4)

+---+-----+--------------------+
|mID|count|          movieTitle|
+---+-----+--------------------+
| 50|  583|    Star Wars (1977)|
|258|  509|      Contact (1997)|
|100|  508|        Fargo (1996)|
|181|  507|Return of the Jed...|
+---+-----+--------------------+
only showing top 4 rows

