In [2]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

val spark = SparkSession.builder()
.appName("types")
.getOrCreate()

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@648583af


### Movies Data

In [4]:
val mv = spark.read
    .option("inferSchema", "true")
    .json("./dataset/movies.json")

mv.show(2)

+-------------+--------+-----------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+------+--------------------+------------+--------+---------------+
|Creative_Type|Director|Distributor|IMDB_Rating|IMDB_Votes|MPAA_Rating|Major_Genre|Production_Budget|Release_Date|Rotten_Tomatoes_Rating|Running_Time_min|Source|               Title|US_DVD_Sales|US_Gross|Worldwide_Gross|
+-------------+--------+-----------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+------+--------------------+------------+--------+---------------+
|         NULL|    NULL|   Gramercy|        6.1|      1071|          R|       NULL|          8000000|   12-Jun-98|                  NULL|            NULL|  NULL|      The Land Girls|        NULL|  146083|         146083|
|         NULL|    NULL|     Strand|        6.9|       207|          R|      Drama|           300000|    7-Aug-98|  

mv: org.apache.spark.sql.DataFrame = [Creative_Type: string, Director: string ... 14 more fields]


In [5]:
//Bool

val dramaFilter = col("Major_Genre") equalTo "Drama"
val budgetFilter = col("Production_Budget") >= 400000
val doubleFilter = dramaFilter and budgetFilter

dramaFilter: org.apache.spark.sql.Column = (Major_Genre = Drama)
budgetFilter: org.apache.spark.sql.Column = (Production_Budget >= 400000)
doubleFilter: org.apache.spark.sql.Column = ((Major_Genre = Drama) AND (Production_Budget >= 400000))


In [8]:
mv.filter(doubleFilter).show(5)

+--------------------+--------------+--------------------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+
|       Creative_Type|      Director|         Distributor|IMDB_Rating|IMDB_Votes|MPAA_Rating|Major_Genre|Production_Budget|Release_Date|Rotten_Tomatoes_Rating|Running_Time_min|             Source|               Title|US_DVD_Sales|US_Gross|Worldwide_Gross|
+--------------------+--------------+--------------------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+
|Contemporary Fiction|          NULL|             Trimark|        3.4|       165|          R|      Drama|          1000000|    9-Oct-98|                    62|            NULL|Original Screenplay|                Slam|        NULL| 1

In [12]:
mv.select("Title").where(dramaFilter)

val myMovie = mv.select(col("Title"), doubleFilter.as("my_movie"))
myMovie.where("my_movie").show(5)

+--------------------+--------+
|               Title|my_movie|
+--------------------+--------+
|                Slam|    true|
|      Twelve Monkeys|    true|
|                1776|    true|
|    Twin Falls Idaho|    true|
|Forty Shades of Blue|    true|
+--------------------+--------+
only showing top 5 rows



myMovie: org.apache.spark.sql.DataFrame = [Title: string, my_movie: boolean]


In [15]:
val numberAdd = round((col("Rotten_Tomatoes_Rating")/10 + col("IMDB_Rating"))/2, 2)

mv.withColumn("Avg_Rating", numberAdd).show(5)

+--------------------+--------+-----------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+----------+
|       Creative_Type|Director|Distributor|IMDB_Rating|IMDB_Votes|MPAA_Rating|Major_Genre|Production_Budget|Release_Date|Rotten_Tomatoes_Rating|Running_Time_min|             Source|               Title|US_DVD_Sales|US_Gross|Worldwide_Gross|Avg_Rating|
+--------------------+--------+-----------+-----------+----------+-----------+-----------+-----------------+------------+----------------------+----------------+-------------------+--------------------+------------+--------+---------------+----------+
|                NULL|    NULL|   Gramercy|        6.1|      1071|          R|       NULL|          8000000|   12-Jun-98|                  NULL|            NULL|               NULL|      The Land Girls|        NULL|  146083|         146083|    

numberAdd: org.apache.spark.sql.Column = round((((Rotten_Tomatoes_Rating / 10) + IMDB_Rating) / 2), 2)


In [16]:
// correlation

val relMv = mv.stat.corr("Rotten_Tomatoes_Rating", "IMDB_Rating")
println(relMv)

0.4259708986248317


relMv: Double = 0.4259708986248317


### Cars Data

In [4]:
val cars = spark.read.option("inferSchema", "true").json("./dataset/cars.json")
cars.show(3)

+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|                Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|1970-01-01|
|        11.5|        8|       350.0|       165|            15.0|   buick skylark 320|   USA|         3693|1970-01-01|
|        11.0|        8|       318.0|       150|            18.0|  plymouth satellite|   USA|         3436|1970-01-01|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
only showing top 3 rows



cars: org.apache.spark.sql.DataFrame = [Acceleration: double, Cylinders: bigint ... 7 more fields]


In [5]:
// initcap -> 첫 단어를 대문자로
// lower <-> upper

cars.select(initcap(col("Name")))

cars.select(upper(col("Name"))).show(5)

+--------------------+
|         upper(Name)|
+--------------------+
|CHEVROLET CHEVELL...|
|   BUICK SKYLARK 320|
|  PLYMOUTH SATELLITE|
|       AMC REBEL SST|
|         FORD TORINO|
+--------------------+
only showing top 5 rows



In [6]:
// contains

cars.select("*").filter(col("Name").contains("volkswagen")).show(5, truncate=false)

+------------+---------+------------+----------+----------------+----------------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|Name                        |Origin|Weight_in_lbs|Year      |
+------------+---------+------------+----------+----------------+----------------------------+------+-------------+----------+
|20.5        |4        |97.0        |46        |26.0            |volkswagen 1131 deluxe sedan|Europe|1835         |1970-01-01|
|20.0        |4        |97.0        |48        |NULL            |volkswagen super beetle 117 |Europe|1978         |1971-01-01|
|19.0        |4        |97.0        |60        |27.0            |volkswagen model 111        |Europe|1834         |1971-01-01|
|23.5        |4        |97.0        |54        |23.0            |volkswagen type 3           |Europe|2254         |1972-01-01|
|18.0        |4        |121.0       |76        |22.0            |volkswagen 411 (sw)         |Europe|2511      

In [7]:
//regex

val regexString = "volkswagen|vw"
val vwDf = cars.select(col("Name"), regexp_extract(col("Name"), regexString, 0).as("reg_ext"))
                       .filter(col("reg_ext") =!= "")
vwDf.show(5)

+--------------------+----------+
|                Name|   reg_ext|
+--------------------+----------+
|volkswagen 1131 d...|volkswagen|
|volkswagen super ...|volkswagen|
|volkswagen model 111|volkswagen|
|   volkswagen type 3|volkswagen|
| volkswagen 411 (sw)|volkswagen|
+--------------------+----------+
only showing top 5 rows



regexString: String = volkswagen|vw
vwDf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Name: string, reg_ext: string]


In [42]:
// regexp_replace -> 특정 정규식에 부합하는 문자를 replace 한다.

vwDf.select(col("Name"), regexp_replace(col("reg_ext"), regexString, "Fixed_Value").as("reg_replace")).show(5)

+--------------------+-----------+
|                Name|reg_replace|
+--------------------+-----------+
|volkswagen 1131 d...|Fixed_Value|
|volkswagen super ...|Fixed_Value|
|volkswagen model 111|Fixed_Value|
|   volkswagen type 3|Fixed_Value|
| volkswagen 411 (sw)|Fixed_Value|
+--------------------+-----------+
only showing top 5 rows



In [62]:
// name의 첫 단어만 추출하여 brand 컬럼으로 만들고 group by count
cars.withColumn("Brand", split(col("Name")," ").getItem(0)).groupBy("Brand").count().show()

+---------+-----+
|    Brand|count|
+---------+-----+
|    buick|   17|
|  pontiac|   16|
| mercedes|    1|
|   toyota|   25|
|     saab|    5|
|      amc|   29|
|       vw|    6|
|  peugeot|    8|
| chrysler|    6|
| plymouth|   32|
|vokswagen|    1|
|  citroen|    1|
|    chevy|    3|
|     audi|    7|
|   datsun|   23|
|      bmw|    2|
|    dodge|   28|
|     ford|   53|
|  toyouta|    1|
|    capri|    1|
+---------+-----+
only showing top 20 rows



In [9]:
def extractCars(brand: String) = {
    val df = cars.select(col("Name"), regexp_extract(col("Name"), brand, 0).as("brand")).filter(col("brand")=!="")
    df                   
}

extractCars: (brand: String)org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]


In [10]:
val test1 = extractCars("benz")
test1.show()

+------------------+-----+
|              Name|brand|
+------------------+-----+
|mercedes-benz 280s| benz|
|mercedes benz 300d| benz|
|mercedes-benz 240d| benz|
+------------------+-----+



test1: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Name: string, brand: string]


In [14]:
// 예제

val getCarNames: List[String] = List("audi", "bmw", "benz")

val carMap = getCarNames.map(_.toLowerCase())

val joinMap = carMap.mkString("|")
println(joinMap)

audi|bmw|benz


getCarNames: List[String] = List(audi, bmw, benz)
carMap: List[String] = List(audi, bmw, benz)
joinMap: String = audi|bmw|benz


In [16]:
val mapdf = cars.select(col("Name"), regexp_extract(col("Name"), joinMap, 0).as("filtered"))
            .where(col("filtered")=!="")

mapdf.show()

+-------------------+--------+
|               Name|filtered|
+-------------------+--------+
|        audi 100 ls|    audi|
|           bmw 2002|     bmw|
|         audi 100ls|    audi|
|           audi fox|    audi|
|         audi 100ls|    audi|
| mercedes-benz 280s|    benz|
|           bmw 320i|     bmw|
|          audi 5000|    audi|
| mercedes benz 300d|    benz|
|          audi 4000|    audi|
|audi 5000s (diesel)|    audi|
| mercedes-benz 240d|    benz|
+-------------------+--------+



mapdf: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Name: string, filtered: string]


In [26]:
val test = getCarNames.map(_.toLowerCase()).map(name => name+"!!")

test.fold("dd")(_+_)

test: List[String] = List(audi!!, bmw!!, benz!!)
res20: String = ddaudi!!bmw!!benz!!


In [29]:
cars.show()

+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|                Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+--------------------+------+-------------+----------+
|        12.0|        8|       307.0|       130|            18.0|chevrolet chevell...|   USA|         3504|1970-01-01|
|        11.5|        8|       350.0|       165|            15.0|   buick skylark 320|   USA|         3693|1970-01-01|
|        11.0|        8|       318.0|       150|            18.0|  plymouth satellite|   USA|         3436|1970-01-01|
|        12.0|        8|       304.0|       150|            16.0|       amc rebel sst|   USA|         3433|1970-01-01|
|        10.5|        8|       302.0|       140|            17.0|         ford torino|   USA|         3449|1970-01-01|
|        10.0|        8|       429.0|       198|

In [42]:
val carNameFilter = getCarNames.map(_.toLowerCase()).map(name => col("Name").contains(name))
val cyFilter = col("Cylinders") === 4
val bigFilter = carNameFilter.fold(lit(false))((carNameFilter, cyFilter) => carNameFilter or cyFilter)

// cars.where(carNameFilter).show()

carNameFilter: List[org.apache.spark.sql.Column] = List(contains(Name, audi), contains(Name, bmw), contains(Name, benz))
cyFilter: org.apache.spark.sql.Column = (Cylinders = 4)
bigFilter: org.apache.spark.sql.Column = (((false OR contains(Name, audi)) OR contains(Name, bmw)) OR contains(Name, benz))
res34: org.apache.spark.sql.Column = (((false OR contains(Name, audi)) OR contains(Name, bmw)) OR contains(Name, benz))


In [47]:
val bigFilter1 = carNameFilter.fold(lit(false))((combinedFilter, newCarNameFilter) => combinedFilter or newCarNameFilter)

bigFilter1: org.apache.spark.sql.Column = (((false OR contains(Name, audi)) OR contains(Name, bmw)) OR contains(Name, benz))


In [43]:
  cars.filter(bigFilter1).show()

+------------+---------+------------+----------+----------------+-------------------+------+-------------+----------+
|Acceleration|Cylinders|Displacement|Horsepower|Miles_per_Gallon|               Name|Origin|Weight_in_lbs|      Year|
+------------+---------+------------+----------+----------------+-------------------+------+-------------+----------+
|        14.5|        4|       107.0|        90|            24.0|        audi 100 ls|Europe|         2430|1970-01-01|
|        12.5|        4|       121.0|       113|            26.0|           bmw 2002|Europe|         2234|1970-01-01|
|        14.0|        4|       114.0|        91|            20.0|         audi 100ls|Europe|         2582|1973-01-01|
|        16.5|        4|        98.0|        83|            29.0|           audi fox|Europe|         2219|1974-01-01|
|        15.0|        4|       115.0|        95|            23.0|         audi 100ls|Europe|         2694|1975-01-01|
|        16.7|        6|       168.0|       120|        