In [30]:
df = spark.read.format("json").load("2015-summary.json")

In [31]:
from pyspark.sql.functions import lit,expr
df.select(expr("*"),lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [32]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [33]:
df.withColumn("withCountry", expr("DEST_COUNTRY_NAME == ORIGIN_COUNTRY_NAME") ).show(2)

+-----------------+-------------------+-----+-----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withCountry|
+-----------------+-------------------+-----+-----------+
|    United States|            Romania|   15|      false|
|    United States|            Croatia|    1|      false|
+-----------------+-------------------+-----+-----------+
only showing top 2 rows



In [34]:
# 컬럼명 변경
# Correcting column name
# df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
# 컬럼명 변경
# Correcting column name
df = df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")

In [36]:
# 데이터 프레임의 열 이름이 특수문자를 포함할때
# dfWidthLongColName = df.withColumn("This Long Column-Name",expr("ORIGIN_COUNTRY_NAME") )
# dfWidthLongColName.show(2)
from pyspark.sql.functions import expr

# # 데이터프레임의 컬럼명 변경
dfWidthLongColName = df.withColumn("This Long Column-Name", expr("ORIGIN_COUNTRY_NAME"))

# # 변경된 데이터프레임 확인
dfWidthLongColName.show(2)


+-------------+-------------------+-----+---------------------+
|         dest|ORIGIN_COUNTRY_NAME|count|This Long Column-Name|
+-------------+-------------------+-----+---------------------+
|United States|            Romania|   15|              Romania|
|United States|            Croatia|    1|              Croatia|
+-------------+-------------------+-----+---------------------+
only showing top 2 rows



In [37]:
dfWidthLongColName.selectExpr("`This Long Column-Name`","`This Long Column-Name` as `new col`").show(2)

+---------------------+-------+
|This Long Column-Name|new col|
+---------------------+-------+
|              Romania|Romania|
|              Croatia|Croatia|
+---------------------+-------+
only showing top 2 rows



In [38]:
# count < 2 and ORIGIN_COUNTRY_NAMW !=Croatia  데이터 추출
from pyspark.sql.functions import col
df.show(2)
df.where(col('count')<2).where(col('ORIGIN_COUNTRY_NAME') != 'Croatia').show(2)

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|          Singapore|    1|
|      Moldova|      United States|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [46]:
# or 조건으로 반경하면
df.where((col('count') < 2) | (col('ORIGIN_COUNTRY_NAME') != 'Croatia')).show(2)

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|United States|            Romania|   15|
|United States|            Croatia|    1|
+-------------+-------------------+-----+
only showing top 2 rows



In [49]:
# DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME 중복을 제거한 후 count
# df.select('DEST_COUNTRY_NAME','ORIGIN_COUNTRY_NAME').distinct().count()

df.select('dest','ORIGIN_COUNTRY_NAME').distinct().count()

256

In [41]:
# sampling
# random --- seed
seed = 5
withReplacement = False
fraction = 0.01  # 추출할 샘플의 비율
df.sample(withReplacement,fraction,seed).show()

+-------------+-------------------+-----+
|         dest|ORIGIN_COUNTRY_NAME|count|
+-------------+-------------------+-----+
|      Bolivia|      United States|   30|
|United States|         Cape Verde|   14|
|United States|            Vietnam|    2|
|United States|            Austria|   63|
+-------------+-------------------+-----+



In [42]:
dataFrame = df.randomSplit([0.25,0.75],seed)
dataFrame[0].count()>dataFrame[1].count()

                                                                                

False

In [44]:
# insert DataFrame
from pyspark.sql import Row
schema = df.schema
newRows=[
    Row("New Country", "Other Country" ,5),
    Row("New Country2", "Other Country3" ,11),
]
# RDD parllelize
parall = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parall,schema)
newDF.show()

[Stage 24:>                                                         (0 + 1) / 1]

+------------+-------------------+-----+
|        dest|ORIGIN_COUNTRY_NAME|count|
+------------+-------------------+-----+
| New Country|      Other Country|    5|
|New Country2|     Other Country3|   11|
+------------+-------------------+-----+



                                                                                