In [13]:
import json
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.window import Window
from pyspark.sql.functions import expr, hour, count, max, col, length
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from functools import reduce
import pandas as pd

In [14]:
import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

spark

In [15]:

driver_feedback_categories_good = [
    "great service",
    "nice car",
    "wonderful companion",
    "neat and tidy",
    "expert navigation",
    "recommend",
]
driver_feedback_categories_bad = [
    "awful service",
    "bad car",
    "unpleasant companion",
    "dirty",
    "non-expert navigation",
    "not recommend",
]


In [16]:
def top_k_drivers(df: DataFrame, k: int):
    return (
        df.where(df.driver_rate.isNotNull())
        .orderBy(df.driver_rate, ascending=False)
        .select("driver_id", "driver_rate")
        .limit(k)
        .rdd.map(lambda row: {"driver_id": row[0], "driver_rate": row[1]})
    )


def top_k_clients(df: DataFrame, k: int):
    return (
        df.where(df.client_rate.isNotNull())
        .orderBy(df.client_rate, ascending=False)
        .select("client_id", "client_rate")
        .limit(100)
        .rdd.map(lambda row: {"client_id": row[0], "client_rate": row[1]})
    )


def top_k_drivers_by_profit(df: DataFrame, k: int):
    return (
        df.withColumn('cost', df.cost.cast('int'))
        .groupBy('driver_id')
        .agg(sum(df.cost).alias("profit"))
        .orderBy("profit", ascending=False)
        .select("driver_id", "profit")
        .limit(k)
        .rdd.map(lambda row: {"driver_id": row[0], "profit": row[1]})
    )


def worst_drivers(df: DataFrame):
    return (
        df.where(df.driver_rate < 3.5)
        .orderBy(df.driver_rate, ascending=True)
        .select("driver_id", "driver_rate")
        .limit(100)
        .rdd.map(lambda row: {"driver_id": row[0], "driver_rate": row[1]})
    )


def top_10_longest_text_comment(df: DataFrame):
    return (
        df.orderBy(length(df.text_driver_feedback), ascending=False)
        .select("driver_id", "driver_rate", "text_driver_feedback")
        .limit(10)
        .rdd.map(
            lambda row: {
                "driver_id": row[0],
                "driver_rate": row[1],
                "text_driver_feedback": row[2],
            }
        )
    )


def top_driver_feedback_category(df: DataFrame):
    return (
        df.where(reduce(lambda a, b: a|b, (df.category_driver_feedback.like("%"+category+"%") for category in driver_feedback_categories_good)))
        .groupBy('category_driver_feedback')
        .count()
        .orderBy('count', ascending=False)
        .select('category_driver_feedback')
        .limit(1)
        .rdd.map( lambda row: {'category_driver_feedback': row[0]})
    )


def top_complaint_feedback_category(df: DataFrame):
  return (
        df.where(reduce(lambda a, b: a|b, (df.category_driver_feedback.like("%"+category+"%") for category in driver_feedback_categories_bad)))
        .groupBy('category_driver_feedback')
        .count()
        .orderBy('count', ascending=False)
        .select('category_driver_feedback')
        .limit(1)
        .rdd.map( lambda row: {'category_driver_feedback': row[0]})
    )


def top_night_riders(df: DataFrame, k: int):
    df = df.withColumn("hour", hour(df.start_time))
    df = df.withColumn(
        "daytime", expr("case when hour > 0 and hour < 7 then 'night' else 'day' end")
    )
    windowSpec_hour = Window.partitionBy("driver_id", "daytime")
    windowSpec = Window.partitionBy("driver_id")
    df_1 = df.withColumn("driver_rides", count(df.client_id).over(windowSpec))
    df_1 = df_1.withColumn("hour_rides", count(df.client_id).over(windowSpec_hour))
    df_1 = df_1.withColumn("pct_rides", df_1.hour_rides / df_1.driver_rides)
    return (
        df_1.where(df_1.daytime == "night")
        .orderBy("pct_rides", ascending=False)
        .dropDuplicates(["driver_id", "hour_rides"])
        .select("driver_id", "hour_rides", "pct_rides")
        .limit(k)
        .rdd.map(
            lambda row: {
                "driver_id": row[0],
                "night_rides": row[1],
                "pct_rides": row[2],
            }
        )
    )


def densest_traffic_by_hour(df: DataFrame):
    df = df.withColumn("hour", hour(df.start_time))
    return (
        df.groupBy(df.hour)
        .agg(count(df.driver_id).alias("count_rides"))
        .orderBy("count_rides", ascending=False)
        .select("hour", "count_rides")
        .limit(1)
        .rdd.map(
            lambda row: {"hour": f"{row[0]}-{(row[0]+1)//24}", "count_rides": row[1]}
        )
    )

In [17]:
schema = StructType([
         StructField('driver_id', IntegerType(), False),
         StructField('client_id', IntegerType(), False),
         StructField('start', StringType(), False),
         StructField('start_latitude', DoubleType(), False),
         StructField('start_longtitude', DoubleType(), False),
         StructField('finish', StringType(), False),
         StructField('finish_latitude', DoubleType(), False),
         StructField('finish_longtitude', DoubleType(), False),
         StructField('distance', DoubleType(), False),
         StructField('road_time', DoubleType(), False),
         StructField('start_time', TimestampType(), False),
         StructField('finish_time', TimestampType(), False),
         StructField('cost', DoubleType(), False),
         StructField('driver_rate', StringType(), True),
         StructField('category_driver_feedback', StringType(), True),
         StructField('text_driver_feedback', StringType(), True),
         StructField('client_rate', StringType(), True),
         StructField('category_client_feedback', StringType(), True),
         StructField('text_client_feedback', StringType(), True)
     ])
# pd_df = pd.read_csv('rides.csv')
df = spark.read.option("header", True).schema(schema).csv("rides_new.csv")

In [18]:
# df=spark.createDataFrame(pd_df[:-1], schema=schema)
    #df = spark.read.csv("/content/drive/MyDrive/rides.csv", header=True)
    # top-100 drivers - variant 1 success
with open("data/top_100_drivers.json", "w") as f:
  f.write(json.dumps(top_k_drivers(df, 100).collect()))
    # top worst drivers - variant 2 success
with open("data/worst_drivers.json", "w") as f:
  f.write(json.dumps(worst_drivers(df).collect()))
    # dencent traffic by hour - variant 3 success
with open("data/densest_traffic_by_hour.json", "w") as f:
  f.write(json.dumps(densest_traffic_by_hour(df).collect()))
    # top 50 clients - variant 4 success
with open("data/top_50_clients.json", "w") as f:
  f.write(json.dumps(top_k_clients(df, 50).collect()))
    # top 50 night drivers - variant 6 - SUCCESS
with open("data/top_night_riders.json", "w") as f:
  f.write(json.dumps(top_night_riders(df, 50).collect()))
    # most frequent category of good drivers - variant 7 SUCCESS
with open("data/top_praised_driver_category.json", "w") as f:
  f.write(json.dumps(top_driver_feedback_category(df).collect()))
    # most frequent category of bad drivers - variant 8 SUCCESS
with open("data/top_complaint_driver_category.json", "w") as f:
  f.write(json.dumps(top_complaint_feedback_category(df).collect()))
    # top 10 longest text comments SUCCESS
with open("data/top_10_longest_text_comments.json", "w") as f:
  f.write(json.dumps(top_10_longest_text_comment(df).collect()))

In [19]:
spark.stop()