In [27]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType , LongType
from pyspark.sql import Row

In [None]:
spark = SparkSession.builder.appName('SparkSQL').getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID = int(fields[0]),\
               name = str(fields[1].encode('utf-8')),\
               age = int(fields[2]),\
               numFriends=int(fields[3]))

lines = spark.sparkContext.textFile('Data/fakefriends.csv')

people = lines.map(mapper)


schemaPeople = spark.createDataFrame(people).cache()
schemaPeople.createOrReplaceTempView('people')

teenagers = spark.sql('select * from people where age >=13')

for teen in teenagers.collect():
    print (teen)

In [None]:
schemaPeople.groupBy('age').count().orderBy('Age').show()

In [None]:


spark = SparkSession.builder.appName("MinTemperatures").getOrCreate()

schema = StructType([ \
                     StructField("stationID", StringType(), True), \
                     StructField("date", IntegerType(), True), \
                     StructField("measure_type", StringType(), True), \
                     StructField("temperature", FloatType(), True)])

# // Read the file as dataframe
df = spark.read.schema(schema).csv("Data/1800.csv")
df.printSchema()


                                                  

In [None]:
# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")
# Select only stationID and temperature
stationTemps = minTemps.select("stationID", "temperature")
# Aggregate to find minimum temperature for every station
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
minTempsByStation.show()

# Convert temperature to fahrenheit and sort the dataset
minTempsByStationF = minTempsByStation.withColumn("temperature",
                                                  func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
                                                  .select("stationID", "temperature").sort("temperature")                                              
# Collect, format, and print the results
results = minTempsByStationF.collect()
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))

In [15]:
schema_movie = StructType([StructField('userID',IntegerType(),True),\
                          StructField('movieID',IntegerType(),True),\
                          StructField('rating',IntegerType(),True),\
                          StructField('timestamp',IntegerType(),True)])
df_movies = spark.read.option('delimiter','\t').schema(schema_movie).csv("ml-100k/u.data")
result = df_movies.groupBy('movieID').count().orderBy(func.desc('count'))
# result.collect()

In [28]:
import codecs

def loadMovieNames():
    movieNames = {}
    # CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
    with codecs.open("ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

nameDict = spark.sparkContext.broadcast(loadMovieNames())

# Create schema when reading u.data
schema = StructType([ \
                     StructField("userID", IntegerType(), True), \
                     StructField("movieID", IntegerType(), True), \
                     StructField("rating", IntegerType(), True), \
                     StructField("timestamp", LongType(), True)])

# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("ml-100k/u.data")

movieCounts = moviesDF.groupBy("movieID").count()

# Create a user-defined function to look up movie names from our broadcasted dictionary
# def lookupName(movieID):
#     return nameDict.value[movieID]

# lookupNameUDF = func.udf(lookupName)
lookupNameUDF = func.udf(lambda x: nameDict.value[x])

# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(col("movieID")))

# Sort the results
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

# Grab the top 10
sortedMoviesWithNames.show(10, False)

+-------+-----+-----------------------------+
|movieID|count|movieTitle                   |
+-------+-----+-----------------------------+
|50     |583  |Star Wars (1977)             |
|258    |509  |Contact (1997)               |
|100    |508  |Fargo (1996)                 |
|181    |507  |Return of the Jedi (1983)    |
|294    |485  |Liar Liar (1997)             |
|286    |481  |English Patient, The (1996)  |
|288    |478  |Scream (1996)                |
|1      |452  |Toy Story (1995)             |
|300    |431  |Air Force One (1997)         |
|121    |429  |Independence Day (ID4) (1996)|
+-------+-----+-----------------------------+
only showing top 10 rows



In [None]:
loadMovieNames()

In [31]:
spark = SparkSession.builder.appName("MostPopularSuperhero").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("Data/Marvel+Names")

lines = spark.read.text("Data/Marvel+Graph")
# Small tweak vs. what's shown in the video: we trim each line of whitespace as that could
# throw off the counts.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
    .groupBy("id").agg(func.sum("connections").alias("connections"))
mostPopular = connections.sort(func.col("connections").desc()).first()
mostPopularName = names.filter(func.col("id") == mostPopular[0]).select("name").first()
print(mostPopularName[0] + " is the most popular superhero with " + str(mostPopular[1]) + " co-appearances.")

CAPTAIN AMERICA is the most popular superhero with 1933 co-appearances.


In [34]:
spark = SparkSession.builder.appName("MostObscureSuperheroes").getOrCreate()

schema = StructType([ \
                     StructField("id", IntegerType(), True), \
                     StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("Data/Marvel+Names")

lines = spark.read.text("Data/Marvel+Graph")

# Small tweak vs. what's shown in the video: we trim whitespace from each line as this
# could throw the counts off by one.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
    .withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
    .groupBy("id").agg(func.sum("connections").alias("connections"))
    
minConnectionCount = connections.agg(func.min("connections")).first()[0]

minConnections = connections.filter(func.col("connections") == minConnectionCount)

minConnectionsWithNames = minConnections.join(names, "id")

print("The following characters have only " + str(minConnectionCount) + " connection(s):")

minConnectionsWithNames.select("name").show()

The following characters have only 0 connection(s):
+--------------------+
|                name|
+--------------------+
|        BERSERKER II|
|              BLARE/|
|MARVEL BOY II/MARTIN|
|MARVEL BOY/MARTIN BU|
|      GIURESCU, RADU|
|       CLUMSY FOULUP|
|              FENRIS|
|              RANDAK|
|           SHARKSKIN|
|     CALLAHAN, DANNY|
|         DEATHCHARGE|
|                RUNE|
|         SEA LEOPARD|
|         RED WOLF II|
|              ZANTOR|
|JOHNSON, LYNDON BAIN|
|          LUNATIK II|
|                KULL|
|GERVASE, LADY ALYSSA|
+--------------------+

