<a href="https://colab.research.google.com/github/hansolko/newsEmail/blob/master/pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [9]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.3 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=fdde7a0b38dd95728b52023ec75f45baa85c762c659c8ce3eef39affd6c5ff3d
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0


In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
import pyspark.sql.functions as f
from datetime import datetime, date
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DateType

# Day 1.

## Q1.

In [5]:
spark = SparkSession.builder.appName('SparkByExamples').getOrCreate()

In [6]:
data = [("James", "M", 60000), ("Michale", "M", 70000),
        ("Robert", None, 400000), ("Maria", "F", 500000),
        ("Jen", "", None)]
columns = ["name", "gender", 'salary']
df = spark.createDataFrame(data = data, schema = columns)
df.show()

+-------+------+------+
|   name|gender|salary|
+-------+------+------+
|  James|     M| 60000|
|Michale|     M| 70000|
| Robert|  null|400000|
|  Maria|     F|500000|
|    Jen|      |  null|
+-------+------+------+



In [7]:
data2 = [("MEM001", "M", "2021-03-17"),
        ("MEM003", "F", "2021-11-01"),
        ("MEM002", "M", "2021-05-20")]
columns2 = ["name", "gender", 'date']
df2 = spark.createDataFrame(data = data2, schema = columns2)
df2.show()

+------+------+----------+
|  name|gender|      date|
+------+------+----------+
|MEM001|     M|2021-03-17|
|MEM003|     F|2021-11-01|
|MEM002|     M|2021-05-20|
+------+------+----------+



## Q2.

In [8]:
# This function converts the string cell into a date:
func =  udf(lambda x: datetime.strptime(x, '%Y-%m-%d'), DateType())

df3 = df2.withColumn('datetime', func(col('date')))
df3.show()
df3.printSchema()

+------+------+----------+----------+
|  name|gender|      date|  datetime|
+------+------+----------+----------+
|MEM001|     M|2021-03-17|2021-03-17|
|MEM003|     F|2021-11-01|2021-11-01|
|MEM002|     M|2021-05-20|2021-05-20|
+------+------+----------+----------+

root
 |-- name: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- date: string (nullable = true)
 |-- datetime: date (nullable = true)



In [9]:
# This function makes month column by using datetime column
func2 = udf(lambda x: x.month)

df4 = df3.withColumn('month', func2(col('datetime')))
df5 = df4.drop(df4.datetime)   # 사용한 열 삭제
df5.show()

+------+------+----------+-----+
|  name|gender|      date|month|
+------+------+----------+-----+
|MEM001|     M|2021-03-17|    3|
|MEM003|     F|2021-11-01|   11|
|MEM002|     M|2021-05-20|    5|
+------+------+----------+-----+



# Day 2.

## Q1.

In [11]:
spark = SparkSession.builder.appName('SparkByExamples').getOrCreate()

In [12]:
day2_data = [("MEM001", 10, "2021-03-17", 12), 
             ("MEM003", 20, "2021-11-01", 1),
             ("MEM002", 10, "2021-05-20", 45), 
             ("MEM00", 10, "2021-05-20", 32),
             ("MEM00", 10, "2021-05-20", 55),
             ("MEM00", 10, "2021-05-20", 12), 
             ("MEM0", 20, "2021-05-20", 1),
             ("MEM0", 10, "2021-05-20", 45), 
             ("MEM00", 30, "2021-05-20", 32),
             ("MEM00", None, "2021-05-20", 23),
             ("MEM001", 10, "2021-03-17", 23), 
             ("MEM003", 20, "2021-11-01", 12),
             ("MEM002", 10, "2021-05-20", 32), 
             ("MEM00", 30, "2021-05-20", 32),
             ("MEM00", 10, "2021-05-20", 55),
             ("MEM00", 20, "2021-05-20", None), 
             ("MEM0", 20, "2021-05-20", 1),
             ("MEM0", 10, "2021-05-20", 54), 
             ("MEM00", 10, "2021-05-20", 3),
             ("MEM00", 20, "2021-05-20", 53)]
day2_columns = ["회원ID", "gender", 'date', 'age']
day2_df = spark.createDataFrame(data = day2_data, schema = day2_columns)
day2_df.show()

+------+------+----------+----+
|회원ID|gender|      date| age|
+------+------+----------+----+
|MEM001|    10|2021-03-17|  12|
|MEM003|    20|2021-11-01|   1|
|MEM002|    10|2021-05-20|  45|
| MEM00|    10|2021-05-20|  32|
| MEM00|    10|2021-05-20|  55|
| MEM00|    10|2021-05-20|  12|
|  MEM0|    20|2021-05-20|   1|
|  MEM0|    10|2021-05-20|  45|
| MEM00|    30|2021-05-20|  32|
| MEM00|  null|2021-05-20|  23|
|MEM001|    10|2021-03-17|  23|
|MEM003|    20|2021-11-01|  12|
|MEM002|    10|2021-05-20|  32|
| MEM00|    30|2021-05-20|  32|
| MEM00|    10|2021-05-20|  55|
| MEM00|    20|2021-05-20|null|
|  MEM0|    20|2021-05-20|   1|
|  MEM0|    10|2021-05-20|  54|
| MEM00|    10|2021-05-20|   3|
| MEM00|    20|2021-05-20|  53|
+------+------+----------+----+



## Q2.

In [13]:
# Q2
def new_gender_generator(x):
    if x == 10:
        return "여"
    elif x == 20:
        return "남"
    else:
        return "정보없음"

def new_age_generator(x):
    if x == None:
        return "정보없음"
    elif x < 10:
        return "정보없음"
    elif x < 20:
        return "10대"
    elif x < 30:
        return "20대"
    elif x < 40:
        return "30대"
    else:
        return "40대 이상"


gender_udf =  udf(lambda x: new_gender_generator(x))
age_udf =  udf(lambda x: new_age_generator(x))

# 적용
day2_df2 = day2_df.withColumn('new_gender', gender_udf(col('gender')))
day2_df3 = day2_df2.withColumn('new_age', age_udf(col('age')))

day2_df3.show()
day2_df3.printSchema()

+------+------+----------+----+----------+---------+
|회원ID|gender|      date| age|new_gender|  new_age|
+------+------+----------+----+----------+---------+
|MEM001|    10|2021-03-17|  12|        여|     10대|
|MEM003|    20|2021-11-01|   1|        남| 정보없음|
|MEM002|    10|2021-05-20|  45|        여|40대 이상|
| MEM00|    10|2021-05-20|  32|        여|     30대|
| MEM00|    10|2021-05-20|  55|        여|40대 이상|
| MEM00|    10|2021-05-20|  12|        여|     10대|
|  MEM0|    20|2021-05-20|   1|        남| 정보없음|
|  MEM0|    10|2021-05-20|  45|        여|40대 이상|
| MEM00|    30|2021-05-20|  32|  정보없음|     30대|
| MEM00|  null|2021-05-20|  23|  정보없음|     20대|
|MEM001|    10|2021-03-17|  23|        여|     20대|
|MEM003|    20|2021-11-01|  12|        남|     10대|
|MEM002|    10|2021-05-20|  32|        여|     30대|
| MEM00|    30|2021-05-20|  32|  정보없음|     30대|
| MEM00|    10|2021-05-20|  55|        여|40대 이상|
| MEM00|    20|2021-05-20|null|        남| 정보없음|
|  MEM0|    20|2021-05-20|   1|        남| 정보없음|
|  ME

# Day3.

## Q1.

In [14]:
spark = SparkSession.builder.appName('SparkByExamples').getOrCreate()

In [15]:
day3_data = [("MEM001","2021-03-17"), 
             ("MEM003","2021-11-01"),
             ("MEM002","2021-05-20"), 
             ("MEM00","2021-05-20")]
day3_columns = ["회원ID", 'date']
day3_df = spark.createDataFrame(data = day3_data, schema = day3_columns)
day3_df.show()

+------+----------+
|회원ID|      date|
+------+----------+
|MEM001|2021-03-17|
|MEM003|2021-11-01|
|MEM002|2021-05-20|
| MEM00|2021-05-20|
+------+----------+



## Q2. INNER JOIN TEST

In [16]:
day2_df3.show()
day3_df.show()

+------+------+----------+----+----------+---------+
|회원ID|gender|      date| age|new_gender|  new_age|
+------+------+----------+----+----------+---------+
|MEM001|    10|2021-03-17|  12|        여|     10대|
|MEM003|    20|2021-11-01|   1|        남| 정보없음|
|MEM002|    10|2021-05-20|  45|        여|40대 이상|
| MEM00|    10|2021-05-20|  32|        여|     30대|
| MEM00|    10|2021-05-20|  55|        여|40대 이상|
| MEM00|    10|2021-05-20|  12|        여|     10대|
|  MEM0|    20|2021-05-20|   1|        남| 정보없음|
|  MEM0|    10|2021-05-20|  45|        여|40대 이상|
| MEM00|    30|2021-05-20|  32|  정보없음|     30대|
| MEM00|  null|2021-05-20|  23|  정보없음|     20대|
|MEM001|    10|2021-03-17|  23|        여|     20대|
|MEM003|    20|2021-11-01|  12|        남|     10대|
|MEM002|    10|2021-05-20|  32|        여|     30대|
| MEM00|    30|2021-05-20|  32|  정보없음|     30대|
| MEM00|    10|2021-05-20|  55|        여|40대 이상|
| MEM00|    20|2021-05-20|null|        남| 정보없음|
|  MEM0|    20|2021-05-20|   1|        남| 정보없음|
|  ME

In [17]:
# join test (사실 할 필요 없음)
inner_join = day2_df3.join(day3_df, day2_df3.date == day3_df.date, how = 'inner')
inner_join.show()

+------+------+----------+---+----------+---------+------+----------+
|회원ID|gender|      date|age|new_gender|  new_age|회원ID|      date|
+------+------+----------+---+----------+---------+------+----------+
|MEM001|    10|2021-03-17| 12|        여|     10대|MEM001|2021-03-17|
|MEM001|    10|2021-03-17| 23|        여|     20대|MEM001|2021-03-17|
|MEM002|    10|2021-05-20| 45|        여|40대 이상|MEM002|2021-05-20|
|MEM002|    10|2021-05-20| 45|        여|40대 이상| MEM00|2021-05-20|
| MEM00|    10|2021-05-20| 32|        여|     30대|MEM002|2021-05-20|
| MEM00|    10|2021-05-20| 32|        여|     30대| MEM00|2021-05-20|
| MEM00|    10|2021-05-20| 55|        여|40대 이상|MEM002|2021-05-20|
| MEM00|    10|2021-05-20| 55|        여|40대 이상| MEM00|2021-05-20|
| MEM00|    10|2021-05-20| 12|        여|     10대|MEM002|2021-05-20|
| MEM00|    10|2021-05-20| 12|        여|     10대| MEM00|2021-05-20|
|  MEM0|    20|2021-05-20|  1|        남| 정보없음|MEM002|2021-05-20|
|  MEM0|    20|2021-05-20|  1|        남| 정보없음| MEM00|2021

In [18]:
# groupby
inner_join_by_gender_age = inner_join.groupby(["new_gender", "new_age"]).count()
inner_join_by_gender_age.show()

+----------+---------+-----+
|new_gender|  new_age|count|
+----------+---------+-----+
|        남|     10대|    1|
|        여| 정보없음|    2|
|        여|     20대|    1|
|  정보없음|     20대|    2|
|        남| 정보없음|    7|
|        여|     30대|    4|
|        남|40대 이상|    2|
|        여|40대 이상|   10|
|  정보없음|     30대|    4|
|        여|     10대|    3|
+----------+---------+-----+



## Q3.

In [19]:
# 열 이름 '활동고객수'로 변경
df_spark = inner_join_by_gender_age.select(
    col('new_gender'),
    col('new_age'),
    col('count').alias('활동고객수') 
)
df_spark.show()

+----------+---------+----------+
|new_gender|  new_age|활동고객수|
+----------+---------+----------+
|        남|     10대|         1|
|        여| 정보없음|         2|
|        여|     20대|         1|
|  정보없음|     20대|         2|
|        남| 정보없음|         7|
|        여|     30대|         4|
|        남|40대 이상|         2|
|        여|40대 이상|        10|
|  정보없음|     30대|         4|
|        여|     10대|         3|
+----------+---------+----------+



## Q4.

In [20]:
# sorting
sort_by_gender = df_spark.sort(df_spark.new_gender.asc())
sort_by_gender_age = sort_by_gender.sort(sort_by_gender.new_age.asc())
sort_by_gender_age.show()

+----------+---------+----------+
|new_gender|  new_age|활동고객수|
+----------+---------+----------+
|        남|     10대|         1|
|        여|     10대|         3|
|        여|     20대|         1|
|  정보없음|     20대|         2|
|        여|     30대|         4|
|  정보없음|     30대|         4|
|        여|40대 이상|        10|
|        남|40대 이상|         2|
|        여| 정보없음|         2|
|        남| 정보없음|         7|
+----------+---------+----------+

