In [1]:
!pip install pyspark==3.3.1 py4j==0.10.9.5

Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845493 sha256=4da51e49f3892f5682b8da2b3143dcf40072392733d35c38bcad2affa1378f09
  Stored in directory: /root/.cache/pip/wheels/0f/f0/3d/517368b8ce80486e84f89f214e0a022554e4ee64969f46279b
Successfully built pyspark
Installing collected packages: py4j, pyspark
  Attempting uninstall: py4j
    Found existing installation: py4j 0.10.9.7
    Uninst

In [2]:
!cd /usr/local/lib/python3.10/dist-packages/pyspark/jars && wget https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.1.0.28/redshift-jdbc42-2.1.0.28.jar

--2024-06-18 10:01:17--  https://s3.amazonaws.com/redshift-downloads/drivers/jdbc/2.1.0.28/redshift-jdbc42-2.1.0.28.jar
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.216.39.8, 52.216.60.48, 52.216.184.213, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.216.39.8|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1077030 (1.0M) [application/java-archive]
Saving to: ‘redshift-jdbc42-2.1.0.28.jar’


2024-06-18 10:01:19 (966 KB/s) - ‘redshift-jdbc42-2.1.0.28.jar’ saved [1077030/1077030]



In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame #5") \
    .getOrCreate()

## Redshift와 연결해서 테이블들을 데이터프레임으로 로딩하기

In [4]:
df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=****") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

In [5]:
df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=****") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

In [6]:
df_user_session_channel.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- sessionid: string (nullable = true)
 |-- channel: string (nullable = true)



In [8]:
df_user_session_channel.rdd.getNumPartitions()

1

In [7]:
df_session_timestamp.printSchema()

root
 |-- sessionid: string (nullable = true)
 |-- ts: timestamp (nullable = true)



In [9]:
df_session_timestamp.rdd.getNumPartitions()

1

## DataFrame으로 처리하기

In [10]:
join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")

In [11]:
session_df.printSchema()

root
 |-- userid: integer (nullable = true)
 |-- sessionid: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- sessionid: string (nullable = true)
 |-- ts: timestamp (nullable = true)



In [12]:
session_df.show(5)

+------+--------------------+--------+--------------------+--------------------+
|userid|           sessionid| channel|           sessionid|                  ts|
+------+--------------------+--------+--------------------+--------------------+
|  1501|0135456d6a3c1051f...|  Google|0135456d6a3c1051f...|2019-09-24 14:49:...|
|   243|0226aa5193c66d990...|  Google|0226aa5193c66d990...|2019-07-01 23:04:...|
|   876|01a416a7e28d0d229...|Facebook|01a416a7e28d0d229...|2019-05-26 14:23:...|
|  2776|029bf49b584c641f0...|Facebook|029bf49b584c641f0...|2019-11-11 20:37:...|
|   939|02b8d6c2775b756de...|  Google|02b8d6c2775b756de...|2019-09-01 15:29:...|
+------+--------------------+--------+--------------------+--------------------+
only showing top 5 rows



In [14]:
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner").select(
    "userid", df_user_session_channel.sessionid, "channel", "ts"
)

In [15]:
channel_count_df = session_df.groupby("channel").count().orderBy("count", ascending=False)

In [16]:
channel_count_df.show()

+---------+-----+
|  channel|count|
+---------+-----+
|  Youtube|17091|
|   Google|16982|
|    Naver|16921|
|  Organic|16904|
|Instagram|16831|
| Facebook|16791|
+---------+-----+



In [17]:
from pyspark.sql.functions import date_format, asc, countDistinct

session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
    agg(countDistinct("userid").alias("mau")).sort(desc('month')).show()

+-------+---+
|  month|mau|
+-------+---+
|2019-05|281|
|2019-06|459|
|2019-07|623|
|2019-08|662|
|2019-09|639|
|2019-10|763|
|2019-11|721|
+-------+---+



## Spark SQL로  처리하기

In [18]:
df_user_session_channel.createOrReplaceTempView("user_session_channel")

In [19]:
df_session_timestamp.createOrReplaceTempView("session_timestamp")

In [20]:
channel_count_df = spark.sql("""
    SELECT channel, count(distinct userId) uniqueUsers
    FROM session_timestamp st
    JOIN user_session_channel usc ON st.sessionID = usc.sessionID
    GROUP BY 1
    ORDER BY 1
""")

In [21]:
channel_count_df

DataFrame[channel: string, uniqueUsers: bigint]

In [22]:
channel_count_df.show()

+---------+-----------+
|  channel|uniqueUsers|
+---------+-----------+
| Facebook|        889|
|   Google|        893|
|Instagram|        895|
|    Naver|        882|
|  Organic|        895|
|  Youtube|        889|
+---------+-----------+



In [23]:
mau_df = spark.sql("""
SELECT
  LEFT(A.ts, 7) AS month,
  COUNT(DISTINCT B.userid) AS mau
FROM session_timestamp A
JOIN user_session_channel B ON A.sessionid = B.sessionid
GROUP BY 1
ORDER BY 1 DESC""")

In [24]:
mau_df.collect()

[Row(month='2019-11', mau=721),
 Row(month='2019-10', mau=763),
 Row(month='2019-09', mau=639),
 Row(month='2019-08', mau=662),
 Row(month='2019-07', mau=623),
 Row(month='2019-06', mau=459),
 Row(month='2019-05', mau=281)]

## MAU (Monthly Active User) 계산

In [26]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, asc, countDistinct

spark = SparkSession \
    .builder \
    .appName("PySpark DataFrame #5") \
    .getOrCreate()

df_user_session_channel = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=****") \
    .option("dbtable", "raw_data.user_session_channel") \
    .load()

df_session_timestamp = spark.read \
    .format("jdbc") \
    .option("driver", "com.amazon.redshift.jdbc42.Driver") \
    .option("url", "jdbc:redshift://learnde.cduaw970ssvt.ap-northeast-2.redshift.amazonaws.com:5439/dev?user=guest&password=****") \
    .option("dbtable", "raw_data.session_timestamp") \
    .load()

join_expr = df_user_session_channel.sessionid == df_session_timestamp.sessionid
session_df = df_user_session_channel.join(df_session_timestamp, join_expr, "inner")

session_df.withColumn('month', date_format('ts', 'yyyy-MM')).groupby('month').\
    agg(countDistinct("userid").alias("mau")).sort(asc('month')).show()

+-------+---+
|  month|mau|
+-------+---+
|2019-05|281|
|2019-06|459|
|2019-07|623|
|2019-08|662|
|2019-09|639|
|2019-10|763|
|2019-11|721|
+-------+---+

