In [1]:
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName('Python Spark SQL basic example')\
        .config('spark.some.config.option', 'some-value')\
        .getOrCreate()

### Create json file using spark
# sparkContext로 객체 생성
sc = spark.sparkContext

# json 파일 읽어들이기
path = '/home/hadoop/prj2/people.json'
peopleDF = spark.read.json(path)

# printSchema()로 json파일의 스키마 형태 볼수 있음
peopleDF.printSchema()

[Stage 0:>                                                          (0 + 1) / 1]

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



                                                                                

In [2]:
# 데이터프레임을 사용하는 임시의 view(가상의 테이블) 생성
peopleDF.createOrReplaceTempView("people")

# spark에서 제공하는 sql 메소드를 이용해 쿼리 날리기
# 쿼리문에서 people 테이블은 위에서 만들었던 view 테이블임!
teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()

+------+
|  name|
+------+
|Justin|
+------+



In [4]:
# json 파일 읽어들이기
path = '/home/hadoop/prj2/people.json'
df = spark.read.json(path)

# Global Temporary View 생성
df.createOrReplaceGlobalTempView('people')

# 'global_temp' 라는 키워드 꼭 붙여주자!
sqlDF = spark.sql('SELECT * FROM global_temp.people')
sqlDF.show()

22/04/01 08:03:42 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/04/01 08:03:42 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/04/01 08:03:47 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/04/01 08:03:47 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore hadoop@127.0.1.1
22/04/01 08:03:47 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
22/04/01 08:03:48 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
# 또한 데이터프레임은 RDD[String] 자료구조를 이용해서 json 데이터셋을 데이터프레임으로 만들 수 있음
jsonStrings = ['{"name": "Yin", "address":{"city":"Columbus", "state":"Ohio"}}']
# json -> RDD형식으로 만들기
otherPeopleRDD = sc.parallelize(jsonStrings)
# json파일 읽어오기
otherPeople = spark.read.json(otherPeopleRDD)
otherPeople.show()

                                                                                

+----------------+----+
|         address|name|
+----------------+----+
|[Columbus, Ohio]| Yin|
+----------------+----+



In [7]:
# json 파일 읽어들이기
path = '/home/hadoop/prj2/people.json'
df = spark.read.json(path)

# name 칼럼 select 해서 살펴보기
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [8]:
# json 파일 읽어들이기
path = '/home/hadoop/prj2/people.json'
df = spark.read.json(path)

# name 칼럼 select 해서 살펴보기
df.select(df['name'], df['age']+1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [10]:
# json 파일 읽어들이기
path = '/home/hadoop/prj2/people.json'
df = spark.read.json(path)

# age가 20보다 큰 데이터만 추출
df.filter(df['age'] > 20).show()
# age 칼럼으로 그룹핑 하고 데이터의 개수를 집계해줌
df.groupBy('age').count().show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



                                                                                

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

