# Spark 사용하기
## vagrant 환경설정
- CMD> vagrant --version
- CMD> vagrant plugin install vagrant-disksize

- //파일명 : Vagrantfile에 붙여넣기

- Vagrant.configure("2") do |config|
  - config.vm.box = "ubuntu/bionic64"  #18.04
  - config.vm.network "forwarded_port", guest: 3000, host: 3000
  - config.vm.network "forwarded_port", guest: 9000, host: 9000    # hadoop
  - config.vm.network "forwarded_port", guest: 9870, host: 9870    # hadoop ui
  - config.vm.network "forwarded_port", guest: 9864, host: 9864    # hadoop
  - config.vm.network "forwarded_port", guest: 9866, host: 9866    # hadoop ui 
  - config.vm.network "forwarded_port", guest: 4000, host: 4000
  - config.vm.network "forwarded_port", guest: 8888, host: 8888    # jupyter notebook
  - config.vm.network "forwarded_port", guest: 7077, host: 7077    # spark master
  - config.vm.network "forwarded_port", guest: 2181, host: 2181    # zookeeper 
  - config.vm.network "forwarded_port", guest: 9092, host: 9092    # kafka
  - config.vm.network "forwarded_port", guest: 4040, host: 4040    # spark ui
  - config.vm.network "forwarded_port", guest: 27017, host: 27017  # mongodb

  - config.vm.network "forwarded_port", guest: 5000, host: 5000    # flask
  - config.vm.network "forwarded_port", guest: 8080, host: 8089    # oracle
  - config.vm.network "forwarded_port", guest: 1521, host: 1522    # oracle

## pyspark 설치
- $ conda install pyspark=2.4.5


## vagrant를 실행
- jupyter notebook을 사용하기 위해 환경설정파일 생성
    - config.vm.network "forwarded_port", guest: 4040, host: 4040  # 추가함
    - 주피터 환경설정파일 생성 $ jupyter notebook --generate-config 
    
- $ nano ~/.jupyter/jupyter_notebook_config.py 접속해서 수정
    - 048라인 : c.NotebookApp.allow_origin = '*'  # 외부 접속 허용하기
    - 204라인 : c.NotebookApp.ip = '10.0.2.15'  #vagrant 사용시  내부 ip로
    - 272라인 : c.NotebookApp.open_browser = False # 시작 시 서버PC에서 주피터 노트북 창이 열릴 필요 없음
    - 292라인 : c.NotebookApp.port = 8888   #포트 설정
    
- $ jupyter notebook : 주피터 노트북 실행

# 라이브러리
- from pyspark.sql import SparkSession # 스파크 사용
- from pyspark.sql.functions import expr #컬럼명 변경할 때
- from pyspark.sql.functions import col #데이터 프레임 컬럼명 사용가능
- from graphframes import GraphFrame #관계형 그래프 적용
- from pyspark.sql.functions import desc    
- import matplotlib.pyplot as plt # 그래프 출력
- import matplotlib.font_manager as fm


### 스파크 버전확인

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]") \
    .enableHiveSupport().appName("hive02") \
    .getOrCreate()

spark.version # 2.4.5

### 데이터 생성

In [None]:
df2 = spark.createDataFrame([(1,'a',10),(2,'b',20),(3,'c',30),(4,'c',40)]).toDF("id","name","age")
df2.printSchema()
df2.show()

### 하둡에 데이터 저장
- hadoop hdfs에 /test1 폴더를 생성함.
    - $ hdfs dfs -mkdir /test1



In [None]:
df2.write.mode("Overwrite").option("header", "true").csv("hdfs://127.0.0.1:9000/test1/df2")

### 저장된 데이터 불러오기

In [None]:
df3 = spark.read.option("header", "true") \
     .csv('hdfs://127.0.0.1:9000/test1/df2')  

df3.printSchema()
df3.show()

### 데이터 프레임을 판다스 구조로 변경

In [None]:
import pandas as pd

# spark dataframe to pandas
df4 = df3.toPandas()
df4

### 판다스를 데이터 프레임 구조로 변경

In [None]:
df5 = spark.createDataFrame(df4)
df5.show()

# Hadoop에 Hive 적용시키기
- hive 폴더 생성
    - $hdfs dfs -mkdir -p /user/hive/warehouse

In [None]:
# hive 적용
spark = SparkSession.builder.master("local[*]") \
        .enableHiveSupport().appName("hive01") \
        .config("spark.datasource.hive.metastore.uris","hdfs://127.0.0.1:9000") \
        .config("spark.sql.warehouse.dir","hdfs://127.0.0.1:9000/user/hive/warehouse") \
        .config("spark.sql.catalogImplementation","hive") \
        .getOrCreate()


## 웹데이터 저장하고 출력하는 방법
- 1. 나의 컴퓨터에서 pyspark 폴더로 들어가 크롬드라이버를 이용해 자료 수집을 하는 파이썬을 만든다.
- 2. 수집된 자료를 mongoDB에 넣는다 # mongoDB에 데이터 넣기전에 포트번호 확인
    - conn = pymongo.MongoClient('127.0.0.1',27017) # 맥은 '192.168.99.100', 32888
	- db = conn.get_database("db1") #없으면 db1생성
	- tb = db.get_collection("t01") #collection 생성
	- tb.insert_many(df.to_dict('records')) 
    
- 3. 4040포트의 환경으로 들어가기
    - cd /home/vagrant/anaconda3/lib/python3.7/site-packages/pyspark/jars # 여기 경로에 밑에 2개 입력
        * wget https://repo1.maven.org/maven2/org/mongodb/spark/mongo-spark-connector_2.11/2.4.1/mongo-spark-connector_2.11-2.4.1.jar
        * wget https://repo1.maven.org/maven2/org/mongodb/mongo-java-driver/3.9.1/mongo-java-driver-3.9.1.jar

### mongoDB에 저장된 데이터 불러오기

In [None]:
spark = SparkSession.builder.master("local[*]").appName("myApp") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/db1.t01") \  # 데이터 가져오기
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1:27017/db1.t02") \ # 데이터 저장
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1') \
    .getOrCreate()


In [None]:
# mongoDB에서 읽기
df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
df.show()

## 관계형 그래프 사용
- $ cd /home/vagrant/anaconda3/lib/python3.7/site-packages/pyspark/jars/ # 해당 경로로 이동
    - wget http://dl.bintray.com/spark-packages/maven/graphframes/graphframes/0.7.0-spark2.4-s_2.11/graphframes-0.7.0-spark2.4-s_2.11.jar # 설치
   


In [None]:
# 적용
spark = SparkSession.builder.master("local[*]") \
    .enableHiveSupport().appName("spark_app1") \
    .config('spark.jars.packages', 'graphframes:graphframes:0.7.0-spark2.4-s_2.11')\ # 관계형 그래프 적용
    .getOrCreate()