In [1]:
!pip install pyspark --quiet

[K     |████████████████████████████████| 281.4 MB 37 kB/s 
[K     |████████████████████████████████| 198 kB 52.6 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("Basics")
sc = SparkContext(conf=conf)

# Popular movie - with RDD

In [None]:
#User_id, movieid, rating, timestamp
lines = sc.textFile("/content/drive/MyDrive/RTA/Pyspark/data/u_data.txt")
movies = lines.map(lambda x:(int(x.split(",")[1]),1))

In [None]:
movies.top(5)

[(1661, 1), (1660, 1), (1657, 1), (1656, 1), (1655, 1)]

In [None]:
moviesCounts = movies.reduceByKey(lambda x,y:x+y)
moviesCounts.top(5)

[(1661, 1), (1660, 1), (1657, 1), (1656, 1), (1655, 1)]

In [None]:
flipped = moviesCounts.map(lambda x:(x[1],x[0]))
sortedMovies = flipped.sortByKey(ascending=False)

In [None]:
results = sortedMovies.collect()
top=0

for result in results:
  if top<10: print(result); top+=1
  else: break

(301, 50)
(279, 258)
(268, 100)
(266, 288)
(259, 181)
(241, 286)
(239, 294)
(225, 1)
(220, 7)
(220, 56)


# Popular Movie - With Dataframe

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
spark = (SparkSession.builder.appName("Dataframe").getOrCreate())

In [None]:
movies = (spark.read.format("csv")
            .load("/content/drive/MyDrive/RTA/Pyspark/data/u_data.txt"))

In [None]:
movie_counts = (movies.select("_c1")
                  .groupBy("_c1").count()
                  .orderBy("count", ascending=False)
                  )

In [None]:
movie_counts.show(10)

+---+-----+
|_c1|count|
+---+-----+
| 50|  301|
|258|  279|
|100|  268|
|288|  266|
|181|  259|
|286|  241|
|294|  239|
|  1|  225|
|  7|  220|
| 56|  220|
+---+-----+
only showing top 10 rows



# Popular Movie - SparkSQL

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
spark = (SparkSession.builder.appName("Dataframe").getOrCreate())

movies = (spark.read.format("csv")
            .load("/content/drive/MyDrive/RTA/Pyspark/data/u_data.txt"))

movies.createOrReplaceTempView("movies_table")

In [7]:
spark.sql("""
select _c1,count(*) as count
from movies_table 
group by _c1
order by count DESC
""").show(10)

+---+-----+
|_c1|count|
+---+-----+
| 50|  301|
|258|  279|
|100|  268|
|288|  266|
|181|  259|
|286|  241|
|294|  239|
|  1|  225|
|  7|  220|
| 56|  220|
+---+-----+
only showing top 10 rows



# Creating UDF

In [9]:
from pyspark.sql.types import *
def am_sum(s): 
  return int(s) * 2

spark.udf.register("am_sum", am_sum, LongType())

spark.sql("""
select _c1,count(*) as count, am_sum(_c1)
from movies_table 
group by _c1
order by count DESC
""").show(10)

+---+-----+-----------+
|_c1|count|am_sum(_c1)|
+---+-----+-----------+
| 50|  301|        100|
|258|  279|        516|
|100|  268|        200|
|288|  266|        576|
|181|  259|        362|
|286|  241|        572|
|294|  239|        588|
|  1|  225|          2|
|  7|  220|         14|
| 56|  220|        112|
+---+-----+-----------+
only showing top 10 rows



# Creating Pandas UDF

In [10]:
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import col, pandas_udf
def am_sum1(s:pd.Series) -> pd.Series: 
  return s.apply(lambda x: int(x) * 2)

am_sum_udf = pandas_udf(am_sum1, returnType=LongType())

In [11]:
# df.select("id", cubed_udf(col("id"))).show()
movie_counts = (movies.select("_c1",am_sum_udf(col("_c1"))))
movie_counts.show(10)

+---+------------+
|_c1|am_sum1(_c1)|
+---+------------+
|242|         484|
|302|         604|
|377|         754|
| 51|         102|
|346|         692|
|474|         948|
|265|         530|
|465|         930|
|451|         902|
| 86|         172|
+---+------------+
only showing top 10 rows



# Higher Order Functions
## transform, filter, exists, reduce

In [17]:
from pyspark.sql import Row

schema = "author STRING, title STRING, pages INT, link ARRAY<INT>"
rows = [Row("author4","title4",500,[1,2,3]), 
        Row("author5","title5",300,[1,2,3]), 
        Row("author6","title6",200,[1,2]), 
        ]
books_df = spark.createDataFrame(rows, schema)
books_df.createOrReplaceTempView("books_df")
books_df.show()

+-------+------+-----+---------+
| author| title|pages|     link|
+-------+------+-----+---------+
|author4|title4|  500|[1, 2, 3]|
|author5|title5|  300|[1, 2, 3]|
|author6|title6|  200|   [1, 2]|
+-------+------+-----+---------+



In [20]:
from pyspark.sql import Row
spark.sql("""
select author,pages, transform(link,t->(t*2)) as transformed
from books_df 
""").show(10)

+-------+-----+-----------+
| author|pages|transformed|
+-------+-----+-----------+
|author4|  500|  [2, 4, 6]|
|author5|  300|  [2, 4, 6]|
|author6|  200|     [2, 4]|
+-------+-----+-----------+



# Explode

In [23]:
spark.sql("""
select author,pages, explode(link) as link
from books_df 
""").show(10)

+-------+-----+----+
| author|pages|link|
+-------+-----+----+
|author4|  500|   1|
|author4|  500|   2|
|author4|  500|   3|
|author5|  300|   1|
|author5|  300|   2|
|author5|  300|   3|
|author6|  200|   1|
|author6|  200|   2|
+-------+-----+----+



# Join in Pyspark

# Join in Spark-Sql

In [23]:
spark.catalog.listDatabases() 

[Database(name='default', description='default database', locationUri='file:/content/spark-warehouse')]

In [24]:
spark.catalog.listTables() 

[Table(name='movies_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [None]:
spark.catalog.listColumns("movies_table")

In [27]:
movies_copy = spark.table("movies_table")
movies_copy.show(5)

+---+---+---+---------+
|_c0|_c1|_c2|      _c3|
+---+---+---+---------+
|196|242|  3|881250949|
|186|302|  3|891717742|
| 22|377|  1|878887116|
|244| 51|  2|880606923|
|166|346|  1|886397596|
+---+---+---+---------+
only showing top 5 rows

