In [0]:
%pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("trip_count_sql").getOrCreate()
spark

In [1]:
%pyspark
#데이터가 들어 있는 디렉토리 지정
directory = "/home/ubuntu/working/datasource"

trips_files = 'trips/*'
zone_file = 'taxi+_zone_lookup.csv'

In [3]:
%pyspark
trips_df = spark.read.csv(f"file://{directory}/{trips_files}", inferSchema = True, header = True)
zone_df = spark.read.csv(f"file://{directory}/{zone_file}", inferSchema = True, header = True)

In [4]:
%pyspark
trips_df.printSchema()
zone_df.printSchema()

# Datalake -> Data Warehoused

- 필요한 데이터

In [6]:
%pyspark
trips_df.createOrReplaceTempView('trips')
zone_df.createOrReplaceTempView('zone')

In [7]:
%pyspark

query = '''
SELECT * FROM trips
'''

z.show(spark.sql(query))

In [8]:
%pyspark

query = '''
SELECT * FROM zone
'''

z.show(spark.sql(query))


In [10]:
%pyspark

# Warehouse
query = """
    SELECT
        t.VendorID as vendor_id,
        TO_DATE(t.tpep_pickup_datetime) as pickup_date,
        TO_DATE(t.tpep_dropoff_datetime) as dropoff_date,
        HOUR(t.tpep_pickup_datetime) as pickup_time,
        HOUR(t.tpep_dropoff_datetime) as dropoff_time,
        
        t.passenger_count,
        t.trip_distance,
        t.fare_amount,
        t.tip_amount,
        t.tolls_amount,
        t.total_amount,
        t.payment_type,
        
        pz.Zone as pickup_zone,
        dz.Zone as dropoff_zone
    FROM trips t
    
    LEFT JOIN zone pz ON t.PULocationID = pz.locationID
    LEFT JOIN zone dz ON t.DOLocationID = dz.locationID
"""

# comb_df가 Warehouse의 역할

comb_df = spark.sql(query)
z.show(comb_df)

# Data Warehouse -> Mart
-웨어하우스에서 마트를 만들기 위해서는 데이터를 검사, 정제


In [12]:
%pyspark

comb_df.createOrReplaceTempView("comb")

In [13]:
%pyspark

# 1. 날짜와 시간을 검사
query = '''
SELECT
    pickup_date,
    pickup_time
FROM comb
WHERE pickup_time >= 0
ORDER BY pickup_date
'''

z.show(spark.sql(query))



In [14]:
-- %sql

-- # 1. 날짜와 시간을 검사

-- SELECT
-- FROM comb
-- WHERE pickup_time >= 0
-- ORDER BY pickup_date

-- z.show(spark.sql(query))

In [15]:
%pyspark

query = ''' 
SELECT count(*)
FROM comb
WHERE pickup_date < '2021-01-01'
'''
z.show(spark.sql(query))

In [16]:
%pyspark
# 2. 요금 데이터 확인

comb_df_describe = comb_df.select('total_amount').describe()
z.show(comb_df_describe)

In [17]:
%pyspark

# 3. 거리 데이터 확인
comb_df.distance = comb_df.select('trip_distance').describe()
z.show(comb_df.distance)

In [18]:
%pyspark
# 4. 월 별 운행 수 확인
query = '''
SELECT
    DATE_TRUNC('MM', pickup_date) as month,
    COUNT(*) as trips
FROM comb
GROUP BY month
ORDER BY month DESC
'''

z.show(spark.sql(query))



In [19]:
%pyspark

# 5. 승객에 대한 통계정보 확인
comb_df_passenger_cnt = comb_df.select('passenger_count').describe()
z.show(comb_df_passenger_cnt)

## 살펴본 내용을 토대로 실제 분석할 데이터로 정제. WareHoues => Mart

In [21]:
%pyspark

# 데이터 정제

query = '''
SELECT *
FROM comb c
WHERE c.total_amount < 200
    AND c.total_amount > 0
    
    AND c.passenger_count < 5
    AND c.pickup_date >= '2021-01-01'
    
    AND c.trip_distance < 10
    AND c.trip_distance > 0
'''

# cleaned_df가 mart의 역활을 한다. 즉 데이터 분석할 상태가 되었다는것을 의미
cleaned_df = spark.sql(query)
cleaned_df.createOrReplaceTempView('cleaned')


In [22]:
%pyspark
z.show(cleaned_df.describe())

In [23]:
%MD
EDA


In [24]:
%pyspark
# pickup date 별 운행 수 확인

query = '''
SELECT
    pickUP_date,
    COUNT(pickup_date) as trips
FROM cleaned
GROUP BY pickup_date
ORDER BY pickup_date ASC
'''

z.show(spark.sql(query))

In [25]:
%pyspark
# 요일 별 운행 수 확인

query = '''
SELECT
    DATE_TRUNC('MM', pickup_date) as month,
    DATE_FORMAT(pickup_date, 'EEEE') as day_of_week,
    COUNT(*) as trips
FROM cleaned
GROUP BY month, day_of_week
ORDER BY month, day_of_week
'''

z.show(spark.sql(query))

In [26]:
%pyspark
# 결제 유형 분석

z.show(cleaned_df.select('payment_type'))

In [27]:
%pyspark
# 숫자로 되어 있는 결제 타입을 문자열로 변경하기
def parse_payment_type(payment_type):

    payment_type_to_string = {
      1: "Credit Card",
      2: "Cash",
      3: "No Charge",
      4: "Dispute",
      5: "Unknown",
      6: "Voided Trip",
    }

    return payment_type_to_string[payment_type]

In [28]:
%pyspark
spark.udf.register('parse_payment_type', parse_payment_type)

In [29]:
%pyspark
# 결제 타입 별 통계

query = '''
SELECT
    parse_payment_type(payment_type) as payment_type_str,
    COUNT(*) as trips,
    MEAN(fare_amount) as mean_fare_amount,
    STD(fare_amount) as std_fare_amount
FROM cleaned
GROUP BY payment_type
'''

z.show(spark.sql(query))

In [31]:
%pyspark
spark.stop()