In [1]:
from pyspark.sql import SQLContext, Row
from pyspark.sql.types import StringType,IntegerType
from pyspark.sql.functions import udf,lit,col,concat
from pyspark.sql import functions as F
from pyspark import SparkContext, SparkConf

sc = SparkContext()
sqlContext = SQLContext(sc)


ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=PySparkShell, master=local[*]) created by <module> at /home/hadoop/venv/lib/python3.5/site-packages/IPython/utils/py3compat.py:188 

In [2]:
# hdfs:// 뒤에 자신의 HADOOP PATH를 넣으면 데이터 불러올 수 있다.
chart_rdd = sc.textFile("hdfs://192.168.0.17:9000/user/hadoop/data_tab.csv")

chart_rdd = chart_rdd.map(lambda x: x.split('\t'))
chart_rows = chart_rdd.map(lambda x: Row(year=x[1], month=x[2], rank=x[3], title=x[4], artist=x[5],
                                      album=x[6], genre=x[7], date=x[8], likes=x[9]))
chart_df = sqlContext.createDataFrame(chart_rows) #rdd > DF

In [3]:
singer_rdd = sc.textFile("hdfs://192.168.0.17:9000/user/hadoop/singer_tab.csv")
singer_rdd.collect()
singer_rdd = singer_rdd.map(lambda x: x.split('\t'))
singer_rows = singer_rdd.map(lambda x: Row(name=x[1], gender=x[2], type=x[3], likes=x[4]))
                                  
singer_df = sqlContext.createDataFrame(singer_rows) #rdd > DF
singer_df.toPandas()

Unnamed: 0,gender,likes,name,type
0,남성,1937,갓츄 (GOT U),그룹
1,남성,22241,2PM,그룹
2,남성,4007,엠투엠 (M To M),솔로
3,여성,4177,효민,솔로
4,혼성,10065,코요태,그룹
5,남성,92,최준영,솔로
6,여성,12458,PRISTIN (프리스틴),그룹
7,여성,14961,써니힐,그룹
8,여성,909,김보아,솔로
9,남성,51578,양요섭,솔로


In [4]:
# 차트 유지 기간을 파생 변수로 사용하기 위해 발매일로 필터링

# 2011년 1월 1일 이후 발매 곡
after2011 = chart_df.filter(chart_df.date >= "2011.01.01")

In [5]:
#join
joined_df = after2011.join(singer_df, after2011.artist==singer_df.name).select(after2011.year, after2011.month, after2011.rank, after2011.title,after2011.genre, after2011.artist, after2011.date, singer_df.gender, singer_df.type)
joined_df= joined_df.withColumn("rank", col("rank").cast("integer"))
joined_df= joined_df.withColumn("month", col("month").cast("integer"))
joined_df= joined_df.withColumn("year", col("year").cast("integer"))
joined_df = joined_df.sort('year','month','rank')
joined_df.toPandas()

Unnamed: 0,year,month,rank,title,genre,artist,date,gender,type
0,2011,1,5,샤이보이,댄스,시크릿,2011.01.06,여성,그룹
1,2011,1,7,나쁜 남자,발라드,JOO,2011.01.04,여성,솔로
2,2011,1,13,왜,댄스,동방신기 (TVXQ!),2011.01.05,남성,그룹
3,2011,1,17,처음 사랑하는 연인들을 위해,발라드,정용화 (CNBLUE),2011.01.14,남성,솔로
4,2011,1,34,Supa Dupa Diva,댄스,달샤벳,2011.01.04,여성,그룹
5,2011,1,35,라떼 한잔,발라드,가비엔제이,2011.01.12,여성,그룹
6,2011,1,40,Stay,댄스,엠블랙,2011.01.10,남성,그룹
7,2011,1,41,사랑해 그리고 기억해,발라드,먼데이 키즈 (Monday Kiz),2011.01.05,남성,솔로
8,2011,1,43,어쩌라고,댄스,승리,2011.01.20,남성,솔로
9,2011,1,44,기가 차,발라드,케이윌,2011.01.21,남성,솔로


In [6]:
import json

def rec_to_actions(df, INDEX, TYPE, cnt):
    idx = cnt
    print(idx)
    lst = []
    for record in df.to_dict(orient="records"):
        lst.append(('{ "index" : { "_index" : "%s", "_type" : "%s", "_id" : %d }}'% (INDEX, TYPE,idx)))
        lst.append(json.dumps(record, default=str, ensure_ascii=False))
        idx += 1
    return lst

In [8]:
##### 연도별로 인기 있는 아티스트의 형태(그룹/솔로) #####

years = [2011,2012,2013,2014,2015,2016,2017,2018,2019,2020]
lst = []
lst2 = []
cnt=0
cnt2=0
for year in years:
    df_year = joined_df.filter(joined_df.year == year)
    
    # 1. 그룹화 => 아티스트, 노래, 순위, 해당 순위 유지 개월 수
    # SQL
    df_year.createOrReplaceTempView('df_year')
    count_sql = """
    SELECT title, artist, rank, gender, type, COUNT(*) AS cnt
    FROM df_year
    GROUP BY title, artist, rank, gender, type
    """

    count_by_rank_df = sqlContext.sql(count_sql)

    # 2. 각 노래의 순위, 해당 순위 유지 개월 수 => 노래 별 점수 구하기
    # score = (101-순위) * 개월 수
    score_by_rank_df = count_by_rank_df.withColumn('score', (101 - F.col('rank'))*F.col('cnt'))
    score_by_rank_df = score_by_rank_df.drop('rank').drop('cnt').groupBy('title','artist', 'gender','type').agg(F.sum('score'))
    score_by_rank_df = score_by_rank_df.withColumnRenamed('sum(score)', 'score')

    # 3. 아티스트별 점수 구하기
    artist_score_df = score_by_rank_df.groupBy('artist','gender','type').agg(F.sum('score'))
    artist_score_df = artist_score_df.withColumnRenamed('sum(score)', 'score')
    artist_score_df = artist_score_df.withColumn('year',lit(year))
    artist_score_df = artist_score_df.sort('score',ascending=False)
    #print(artist_score_df.toPandas()[:10])
    result2 = rec_to_actions(artist_score_df.toPandas()[:10],"top_artist_by_year", "t_a_b_y", cnt2)
    lst2 += result2
    cnt2 += 10
    
    # 4. gender & type & score(평균) 매칭
    gender_type_result = artist_score_df.groupBy('gender','type').agg(F.mean('score'))
    gender_type_result = gender_type_result.withColumn('year',lit(year))
    gender_type_result = gender_type_result.sort('avg(score)', ascending=False)
    #print(gender_type_result.toPandas())

    result = rec_to_actions(gender_type_result.toPandas(),"artist_by_year", "a_b_y", cnt)
    lst += result
    cnt += gender_type_result.count()
    
dest_file = './elk_data/artist_by_year.json'
output_file = open(dest_file, 'w', encoding='utf-8')
for l in lst:
    print(eval(l))
    json.dump(eval(l), output_file, ensure_ascii=False) 
    output_file.write("\n")

dest_file2 = './elk_data/top_artist_by_year.json'
output_file = open(dest_file2, 'w', encoding='utf-8')
for l in lst2:
    print(eval(l))
    json.dump(eval(l), output_file, ensure_ascii=False) 
    output_file.write("\n")


0
10
20
30
40
50
60
70
80
90
{'index': {'_id': 0, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': 'BIGBANG', 'type': '그룹', 'year': 2011, 'score': 1729, 'gender': '남성'}
{'index': {'_id': 1, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': '리쌍', 'type': '그룹', 'year': 2011, 'score': 1559, 'gender': '남성'}
{'index': {'_id': 2, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': '아이유', 'type': '솔로', 'year': 2011, 'score': 1557, 'gender': '여성'}
{'index': {'_id': 3, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': '김범수', 'type': '솔로', 'year': 2011, 'score': 1431, 'gender': '남성'}
{'index': {'_id': 4, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': '2NE1', 'type': '그룹', 'year': 2011, 'score': 1286, 'gender': '여성'}
{'index': {'_id': 5, '_type': 't_a_b_y', '_index': 'top_artist_by_year'}}
{'artist': '버스커 버스커', 'type': '그룹', 'year': 2011, 'score': 1206, 'gender': '남성'}
{'index': {'_id': 6, '_type': 't_a_b_y', '_index': 'top_a

In [None]:
##### 계절별로 인기 있는 아티스트의 형태(그룹/솔로) #####
seasons = [[3,4,5,'spring'],[6,7,8,'summer'],[9,10,11,'fall'],[12,1,2,'winter']]
lst = []
lst2 = []
cnt=0
cnt2 = 0
for s in seasons:
    #month로 계절 필터링
    df_season = joined_df.filter(joined_df.month.isin(s))
        
    # SQL
    df_season.createOrReplaceTempView('df_season')
    count_sql2 = """
    SELECT title, artist, rank, gender, type, year, COUNT(*) AS cnt
    FROM df_season
    GROUP BY title, artist, rank, gender, type, year
    """

    season_count_by_rank_df = sqlContext.sql(count_sql2)

    # 2. 각 노래의 순위, 해당 순위 유지 개월 수 => 노래 별 점수 구하기
    # score = (101-순위) * 개월 수
    score_by_rank_df = season_count_by_rank_df.withColumn('score', (101 - F.col('rank'))*F.col('cnt'))
    score_by_rank_df = score_by_rank_df.drop('rank').drop('cnt').groupBy('artist', 'gender','type','year').agg(F.sum('score'))
    score_by_rank_df = score_by_rank_df.withColumnRenamed('sum(score)', 'score')
    score_by_rank_df = score_by_rank_df.sort('year','score',ascending=False)
    #print(score_by_rank_df.toPandas())
    
    # 3. 아티스트별 & 연도별 점수 구하기
    season_result = score_by_rank_df.groupBy('gender','type','year').agg(F.avg('score'))
    season_result = season_result.withColumnRenamed('avg(score)', 'score')
    season_result = season_result.sort('year','score',ascending=False)
    season_result = season_result.withColumn('season',lit(s[3]))
    #print(season_result.toPandas())
    result = rec_to_actions(season_result.toPandas(),"artist_by_year_season", "a_b_y_s", cnt)
    lst += result
    cnt += season_result.count()
    #season_result.toPandas().to_json('result_artist_by_year_'+s[3]+'.json', force_ascii=False, orient="records")
    copied_df = season_result
    copied_df = copied_df.withColumn("artist", concat(col("gender"),lit(' '),col("type"))).drop(col("gender")).drop(col("type"))
    result2 = rec_to_actions(copied_df.toPandas(),"artist_gendertype_merged", "a_gt_m", cnt2)
    lst2 += result2
    cnt2 += copied_df.count()

dest_file = './elk_data/artist_by_year_season.json'
output_file = open(dest_file, 'w', encoding='utf-8')
for l in lst:
    print(eval(l))
    json.dump(eval(l), output_file, ensure_ascii=False) 
    output_file.write("\n")

dest_file2 = './elk_data/artist_gendertype_merged.json'
output_file = open(dest_file2, 'w', encoding='utf-8')
for l in lst2:
    print(eval(l))
    json.dump(eval(l), output_file, ensure_ascii=False) 
    output_file.write("\n")

In [None]:
##### 연도/계절별로 인기 있는 장르  #####
seasons = [[3,4,5,'spring'],[6,7,8,'summer'],[9,10,11,'fall'],[12,1,2,'winter']]
lst = []
cnt=0
for s in seasons:
    #month로 계절 필터링
    df_genre = joined_df.filter(joined_df.month.isin(s))
        
    # SQL
    df_genre.createOrReplaceTempView('df_genre')
    count_sql2 = """
    SELECT title, artist, rank, genre, year, COUNT(*) AS cnt
    FROM df_genre
    GROUP BY title, artist, rank, genre, year
    """
    season_genre_df = sqlContext.sql(count_sql2)
    #print(season_genre_df.toPandas())

    # 2. 각 노래의 순위, 해당 순위 유지 개월 수 => 노래 별 점수 구하기
    # score = (101-순위) * 개월 수
    genre_score_df = season_genre_df.withColumn('score', (101 - F.col('rank'))*F.col('cnt'))
    genre_score_df = genre_score_df.drop('rank').drop('cnt').groupBy('genre','year','title','artist').agg(F.sum('score'))
    genre_score_df = genre_score_df.withColumnRenamed('sum(score)', 'score')
    #print(genre_score_df.toPandas())

    # 3. 장르별 점수 구하기
    genre_rank = genre_score_df.groupBy('genre','year').agg(F.avg('score'))
    genre_rank = genre_rank.withColumnRenamed('avg(score)', 'score')
    genre_rank = genre_rank.sort('year','score',ascending=False)
    genre_rank = genre_rank.withColumn('season',lit(s[3]))
    result = rec_to_actions(genre_rank.toPandas(),"genre_by_year_season", "g_b_y_s", cnt)
    lst += result
    cnt += genre_rank.count()
    #print(genre_rank.toPandas())
    #genre_rank.toPandas().to_json('result_genre_year_'+s[3]+'.json', force_ascii=False, orient="records")
    
dest_file = './elk_data/genre_by_year_season.json'
output_file = open(dest_file, 'w', encoding='utf-8')
for l in lst:
    print(eval(l))
    json.dump(eval(l), output_file, ensure_ascii=False) 
    output_file.write("\n")