## Colab 실습 내용

- pyspark와 Py4J 패키지를 설치. 
- Py4J 패키지는 파이썬 프로그램이 자바 가상머신상의 object들을 접근할 수 있게 해준다. 
- local Standalone Spark를 사용

In [None]:
!pip install pyspark==3.0.1 py4j==0.10.9 

### Redshift의 관련 jar파일 설치

In [None]:
!cd /usr/local/lib/python3.8/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/1.2.20.1043/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/usr/local/lib/python3.8/dist-packages/pyspark/jars/RedshiftJDBC42-no-awssdk-1.2.20.1043.jar") \
    .getOrCreate()

# **SparkSQL 맛보기**

In [None]:
import pandas as pd

namegender_pd = pd.read_csv("https://s3-geospatial.s3-us-west-2.amazonaws.com/name_gender.csv")

In [None]:
namegender_pd.head()

In [None]:
namegender_pd.groupby(["gender"]).count()

In [None]:
namegender_df = spark.createDataFrame(namegender_pd)

In [None]:
namegender_df.printSchema()

In [None]:
namegender_df.show()

In [None]:
namegender_df.groupBy(["gender"]).count().collect()

In [None]:
# https://towardsdatascience.com/pyspark-and-sparksql-basics-6cb4bf967e53

데이터프레임을 테이블뷰로 만들어서 SparkSQL로 처리해보기

In [None]:
namegender_df.createOrReplaceTempView("namegender")

In [None]:
namegender_group_df = spark.sql("SELECT gender, count(1) FROM namegender GROUP BY 1")

In [None]:
namegender_group_df.collect()

Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하기

In [None]:
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

In [None]:
df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=Guest1234") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

In [None]:
df_user_session_channel.createOrReplaceTempView("user_session_channel")

In [None]:
df_session_timestamp.createOrReplaceTempView("session_timestamp")

In [None]:
channel_count_df = spark.sql("""
    SELECT channel, count(distinct userId) uniqueUsers
    FROM session_timestamp st
    JOIN user_session_channel usc ON st.sessionID = usc.sessionID
    GROUP BY 1
    ORDER BY 1
""")

In [None]:
channel_count_df

In [None]:
channel_count_df.show()

In [None]:
channel_with_o_count_df = spark.sql("""
    SELECT COUNT(1)
    FROM user_session_channel
    WHERE channel like '%o%'
""")

In [None]:
channel_with_o_count_df.collect()