Here, we will solve problems two ways
1. First using PySpark function 
2. Second using Spark SQL

In [1]:
# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession

In [2]:
#Start Spark Session
spark = SparkSession.builder.appName("problem8").getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/02/23 22:25:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/02/23 22:25:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/02/23 22:25:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [3]:
#Load CSV file into DataFrame
ridelogdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("ride_log.csv")

                                                                                

In [4]:
#Check Schema of DataFrame
ridelogdf.printSchema()

root
 |-- id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- distance: integer (nullable = true)



In [5]:
#Load CSV file into DataFrame
userdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("user.csv")

In [6]:
#Check Schema of DataFrame
userdf.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)



In [10]:
# Now we are solving Same problem using Spark SQL 
# Creating Temp Table or HIVE table
ridelogdf.createOrReplaceTempView("tmpRidelog")
userdf.createOrReplaceTempView("tmpUser")

In [11]:
sqlContext.sql("SELECT * FROM tmpRidelog").show()

+---+-------+--------+
| id|user_id|distance|
+---+-------+--------+
|101|      8|      93|
|102|     40|      56|
|103|     28|      83|
|104|     33|      83|
|105|      1|      87|
|106|     32|      49|
|107|      3|       5|
|108|     23|      37|
|109|     31|      62|
|110|      1|      35|
|111|     41|      89|
|112|     19|      64|
|113|     49|      57|
|114|     28|      68|
|115|     48|      94|
|116|     50|      89|
|117|     48|      29|
|118|     13|      16|
|119|     24|      58|
|120|     25|      19|
+---+-------+--------+
only showing top 20 rows



In [12]:
sqlContext.sql("SELECT * FROM tmpUser").show()

+---+-------------------+
| id|               name|
+---+-------------------+
|  1|       Dustin Smith|
|  2|        Jay Ramirez|
|  3|       Joseph Cooke|
|  4|      Melinda Young|
|  5|        Sean Parker|
|  6|         Ian Foster|
|  7|Christopher Schmitt|
|  8|  Patrick Gutierrez|
|  9|     Dennis Douglas|
| 10|      Brenda Morris|
| 11|  Jeffery Hernandez|
| 12|         David Rice|
| 13|     Charles Foster|
| 14|    Keith Perez DVM|
| 15|        Dean Cuevas|
| 16|     Melissa Bishop|
| 17|   Alexander Howell|
| 18|   Austin Robertson|
| 19|    Sherri Mcdaniel|
| 20|       Nancy Nguyen|
+---+-------------------+
only showing top 20 rows



In [32]:
sqlContext.sql("SELECT user_id \
                      , name \
                      , sum(distance) as total\
                      , RANK() OVER (ORDER BY sum(distance)) as actualrank \
                FROM tmpRidelog as log \
                LEFT OUTER JOIN tmpUser as users \
                ON log.user_id = users.id \
                GROUP BY user_id, name").show()

+-------+-------------------+-----+----------+
|user_id|               name|total|actualrank|
+-------+-------------------+-----+----------+
|      3|       Joseph Cooke|    5|         1|
|     45|   Benjamin Mcbride|   11|         2|
|     13|     Charles Foster|   16|         3|
|     18|   Austin Robertson|   27|         4|
|     36|        Alyssa Shaw|   28|         5|
|     37|      Destiny Clark|   48|         6|
|     40|       Stacy Bryant|   56|         7|
|     19|    Sherri Mcdaniel|   64|         8|
|     23|    Joseph Hamilton|   79|         9|
|     21|        Melody Ball|   81|        10|
|     39|          Mark Diaz|   81|        10|
|     38|        Thomas Lara|   82|        12|
|     33|        Donna Ortiz|   83|        13|
|     31|      Shannon Green|   86|        14|
|     41|        Howard Rose|   89|        15|
|     10|      Brenda Morris|   90|        16|
|     27|   Jacqueline Heath|   91|        17|
|      5|        Sean Parker|   92|        18|
|      7|Chri

In [27]:
sqlContext.sql("SELECT q.user_id, q.name, q.total \
                FROM  \
                (  \
                   SELECT user_id \
                          , name \
                          , sum(distance) as total\
                          , RANK() OVER (ORDER BY sum(distance)) as actualrank \
                    FROM tmpRidelog as log \
                    LEFT OUTER JOIN tmpUser as users \
                    ON log.user_id = users.id \
                    GROUP BY user_id, name ) as q \
                    WHERE q.actualrank <= 10").show()

+-------+----------------+-----+
|user_id|            name|total|
+-------+----------------+-----+
|      3|    Joseph Cooke|    5|
|     45|Benjamin Mcbride|   11|
|     13|  Charles Foster|   16|
|     18|Austin Robertson|   27|
|     36|     Alyssa Shaw|   28|
|     37|   Destiny Clark|   48|
|     40|    Stacy Bryant|   56|
|     19| Sherri Mcdaniel|   64|
|     23| Joseph Hamilton|   79|
|     39|       Mark Diaz|   81|
|     21|     Melody Ball|   81|
+-------+----------------+-----+



In [28]:
sqlContext.sql("SELECT q.user_id, q.name, q.total \
                FROM  \
                (  \
                   SELECT user_id \
                          , name \
                          , sum(distance) as total\
                          , RANK() OVER (ORDER BY sum(distance) DESC) as actualrank \
                    FROM tmpRidelog as log \
                    LEFT OUTER JOIN tmpUser as users \
                    ON log.user_id = users.id \
                    GROUP BY user_id, name ) as q \
                    WHERE q.actualrank <= 10").show()

+-------+-----------------+-----+
|user_id|             name|total|
+-------+-----------------+-----+
|     47|  Christina Price|  328|
|     34| Jennifer Simmons|  277|
|     43|  Kimberly Potter|  275|
|      8|Patrick Gutierrez|  243|
|     25|     Crystal Berg|  239|
|     14|  Keith Perez DVM|  214|
|     32|    Stacy Collins|  210|
|     11|Jeffery Hernandez|  206|
|      9|   Dennis Douglas|  206|
|     17| Alexander Howell|  205|
+-------+-----------------+-----+

