## Example_1

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructField, StructType,StringType,IntegerType,LongType

In [9]:
spark = SparkSession.builder.appName("movieApp").getOrCreate()
schema = StructType([
    StructField("user_id",StringType(),True),
    StructField("movie_id",StringType(),True),
    StructField("rating",IntegerType(),True),
    StructField("timestamp",LongType(),True)
])

In [10]:
movie_df = spark.read.option("sep","\t").schema(schema).csv("../DATA/ml-100k/u.data")

In [14]:
movie_df.groupBy("movie_id").count().orderBy(func.desc("count")).show()

+--------+-----+
|movie_id|count|
+--------+-----+
|      50|  583|
|     258|  509|
|     100|  508|
|     181|  507|
|     294|  485|
|     286|  481|
|     288|  478|
|       1|  452|
|     300|  431|
|     121|  429|
|     174|  420|
|     127|  413|
|      56|  394|
|       7|  392|
|      98|  390|
|     237|  384|
|     117|  378|
|     172|  367|
|     222|  365|
|     204|  350|
+--------+-----+
only showing top 20 rows



In [15]:
spark.stop()

# Example2

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructField, StructType,StringType,IntegerType,LongType

spark = SparkSession.builder.appName("movieApp").getOrCreate()

schema = StructType([
    StructField("user_id",StringType(),True),
    StructField("movie_id",StringType(),True),
    StructField("rating",IntegerType(),True),
    StructField("timestamp",LongType(),True)
])
movie_df = spark.read.option("sep","\t").schema(schema).csv("../DATA/ml-100k/u.data")

In [26]:
# brodcast 
def loadMovieNames():
    with open("../DATA/ml-100k/u.item","r",encoding="ISO-8859-1",errors="ignore") as file:
        lines = file.read().split("\n")
        dict_=dict()
        for line in lines:
#             print(line)
            try:
                fields = line.split("|")
                id = str(fields[0])            
                name = str(fields[1])
                dict_[id]=name
            except:
                print(len(line))
    return dict_
name_dict= spark.sparkContext.broadcast(loadMovieNames())
 

0


In [49]:
movie_count_df = movie_df.groupBy("movie_id").count()

In [34]:
def lookup(movieID):
    return name_dict.value[movieID]

In [35]:
lookup_udf = func.udf(lookup)

In [50]:
movie_count_df = movie_count_df.withColumn("name",lookup_udf(func.col("movie_id")))

In [51]:
movie_count_df = movie_count_df.select("name","count").orderBy(func.desc("count"))

In [53]:
movie_count_df.show()

+--------------------+-----+
|                name|count|
+--------------------+-----+
|    Star Wars (1977)|  583|
|      Contact (1997)|  509|
|        Fargo (1996)|  508|
|Return of the Jed...|  507|
|    Liar Liar (1997)|  485|
|English Patient, ...|  481|
|       Scream (1996)|  478|
|    Toy Story (1995)|  452|
|Air Force One (1997)|  431|
|Independence Day ...|  429|
|Raiders of the Lo...|  420|
|Godfather, The (1...|  413|
| Pulp Fiction (1994)|  394|
|Twelve Monkeys (1...|  392|
|Silence of the La...|  390|
|Jerry Maguire (1996)|  384|
|    Rock, The (1996)|  378|
|Empire Strikes Ba...|  367|
|Star Trek: First ...|  365|
|Back to the Futur...|  350|
+--------------------+-----+
only showing top 20 rows



## example 3 populat superhero

In [25]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructField, StructType,StringType,IntegerType,LongType

spark = SparkSession.builder.appName("superhero").getOrCreate()

schema = StructType([
    StructField("ID",StringType(),True),
    StructField("name",StringType(),True)
])
names = spark.read.schema(schema).option("sep"," ").csv("/media/user/Files/sparkCourse/DATA/2015-10-01_13-52-12__Marvel-names.txt")

In [26]:
names.show()

+---+--------------------+
| ID|                name|
+---+--------------------+
|  1|24-HOUR MAN/EMMANUEL|
|  2|3-D MAN/CHARLES CHAN|
|  3|    4-D MAN/MERCURIO|
|  4|             8-BALL/|
|  5|                   A|
|  6|               A'YIN|
|  7|        ABBOTT, JACK|
|  8|             ABCISSA|
|  9|                ABEL|
| 10|ABOMINATION/EMIL BLO|
| 11|ABOMINATION | MUTANT|
| 12|         ABOMINATRIX|
| 13|             ABRAXAS|
| 14|          ADAM 3,031|
| 15|             ABSALOM|
| 16|ABSORBING MAN/CARL C|
| 17|ABSORBING MAN | MUTA|
| 18|                ACBA|
| 19|ACHEBE, REVEREND DOC|
| 20|            ACHILLES|
+---+--------------------+
only showing top 20 rows



In [10]:
ids = spark.read.text("/media/user/Files/sparkCourse/DATA/2015-10-01_13-52-05__Marvel-graph.txt")

In [15]:
ids = ids.withColumn("id",func.split(func.col("value")," ")[0])\
        .withColumn("connections",func.size(func.split(func.col("value")," "))-1)

In [17]:
ids =ids.groupby("id").agg(func.sum("connections").alias("connections"))

In [20]:
ids = ids.sort(func.desc("connections"))
ids.show()

+----+-----------+
|  id|connections|
+----+-----------+
| 859|       1937|
|5306|       1745|
|2664|       1532|
|5716|       1429|
|6306|       1397|
|3805|       1389|
|2557|       1374|
|4898|       1348|
|5736|       1292|
| 403|       1283|
|6066|       1266|
|2650|       1247|
|2399|       1179|
|1289|       1107|
|5467|       1098|
| 133|       1097|
|6148|       1096|
| 154|       1095|
|5046|       1083|
|1602|       1082|
+----+-----------+
only showing top 20 rows



In [22]:
ids.first()[0]

'859'

In [28]:
names.filter(func.col("ID")==ids.first()[0]).select("name").first()

Row(name='CAPTAIN AMERICA')

## Excerise 1

In [42]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StringType,StructField,StructType

spark = SparkSession.builder.appName("ObsecureHeroes").getOrCreate()

schema = StructType([
StructField("ID",StringType(),True),
StructField("name",StringType(),True)]
    
)
names = spark.read.option("sep"," ").schema(schema).csv("/media/user/Files/sparkCourse/DATA/2015-10-01_13-52-12__Marvel-names.txt")

In [74]:
ids = spark.read.text("/media/user/Files/sparkCourse/DATA/2015-10-01_13-52-05__Marvel-graph.txt")


ids = ids.withColumn("ID",func.split(func.col("value")," ")[0])\
.withColumn("connections",func.size(func.split(func.col("value")," "))-1)\
.groupby("ID").agg(func.sum("connections").alias("connections"))\
.sort(func.col("connections"))

In [80]:
df = ids.join(names,['ID'],how = "right")\
    .sort(func.col("connections"))\
    .na.fill(value=0)\
    .filter(func.col("connections")<=1)


In [81]:
df =df.toPandas()

In [82]:
df

Unnamed: 0,ID,connections,name
0,6487,0,AA2 35
1,6488,0,M/PRM 35
2,6489,0,M/PRM 36
3,6490,0,M/PRM 37
4,6491,0,WI? 9
...,...,...,...
12956,4602,1,RED WOLF II
12957,4784,1,RUNE
12958,4945,1,SEA LEOPARD
12959,5028,1,SHARKSKIN
