**The pyspark operation about RDD**

The information is from the link below.  

* [https://spark.apache.org/docs/latest/rdd-programming-guide.html](https://spark.apache.org/docs/latest/rdd-programming-guide.html)

In [None]:
from pyspark import SparkContext, SparkConf

In [None]:
conf = SparkConf().setAppName("appName").setMaster("local")
sc = SparkContext(conf=conf)

**1. Read the iris data(../data/iris.csv) and show first ten lines.**

In [None]:
iris = sc.textFile("../data/iris.csv")
iris.take(10)    

**2. From iris data, select the lines with condition that the last column is 'setosa' and show the first ten lines.**

In [None]:
iris.filter(lambda line: 'setosa' == line.split(',')[-1]).take(10)

**3. From iris data, sample 10 lines with replacement.**

In [None]:
# without transformation
iris.takeSample(True, 10)

**4. From iris data, sample lines each with 1/10 probability with condition that the species are 'setosa' and 'versicolor'. And union those.**

In [None]:
setosa = iris.filter(lambda line: 'setosa' in line).sample(True, 1/10)
versicolor = iris.filter(lambda line: 'versicolor' in line).sample(True, 1/10)

setosa.union(versicolor).collect()

**5. From iris data, make key-data with condition that the key is the last column and the value is the first column. And count the data per key**

In [None]:
key_value_iris = iris.map(lambda line: (line.split(',')[-1], float(line.split(',')[0])))
key_value_iris.countByKey()

**6. To the key-value data, sum-up based on the key(species).**

In [None]:
key_value_iris.reduceByKey(lambda a,b:a+b).collect()

**7. To the key-value data, sort with descending by key and show the first 10 lines.**

In [None]:
key_value_iris.sortByKey(False).take(10)

**8. Show the row size of iris data.**

In [None]:
iris.count()

**9. By map() and reduce(), calculate the sum of all values of iris data except for species column.**

In [None]:
iris.map(lambda line: line.split(',')[:-1]).map(lambda line: sum([float(fac) for fac in line])).reduce(lambda a,b:a+b)

In [None]:
# Do the same thing with smaller steps
# omit species column
value_columns = iris.map(lambda line: line.split(',')[:-1])

# sum per row
sum_per_row = value_columns.map(lambda line: sum([float(fac) for fac in line]))

# total
sum_per_row.reduce(lambda a,b: a+b)

**The pyspark operation about DataFrames**

The information is from the link below.

* [https://spark.apache.org/docs/latest/sql-programming-guide.html](https://spark.apache.org/docs/latest/sql-programming-guide.html)


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


**10. Read the cat data(../data/cat.json).**

In [None]:
# load json
cat = spark.read.json("../data/cat.json")

**11. Check the type of iris and cat.**

In [None]:
print("iris: {}".format(type(iris)))
print("cat: {}".format(type(cat)))

**12. Show the cat data.**

In [None]:
cat.show()

**13. Show the schema of cat data.**

In [None]:
cat.printSchema()

**14. Select the 'name' column of cat data and show it.**

In [None]:
cat.select("name").show()

**15. From cat data, select the line with condition that the value of 'name' column is 'Deborah' and show it**

In [None]:
cat.filter(cat["name"] == "Deborah").show()

**16. By spark.sql(), show all the cat data.**

In [None]:
cat.createGlobalTempView("cat")

In [None]:
spark.sql("SELECT * FROM global_temp.cat").show()

**17. Convert iris data(whose type is `<class 'pyspark.rdd.RDD'>`) to DataFrame and show it.**

In [None]:
iris_tuple = iris.map(lambda line: line.split(',')).map(lambda line: list(map(float, line[:-1])) + [str(line[-1])])

iris_dataframe = spark.createDataFrame(iris_tuple, ['sepal_length', 'sepal_width', 'petal_length', 'petal_width', 'species'])
iris_dataframe.show()

**18. Check the type of each columns of iris DataFrame.**

In [None]:
iris_dataframe.dtypes

**19. By spark.sql(), select lines from iris with condition that sepal_width > 3.0 and show it.**

In [None]:
# sepal_width > 3.0
iris_dataframe.createOrReplaceTempView("iris")
spark.sql("SELECT * FROM iris WHERE sepal_width > 3.0").show()

**20. Change the iris_dataframe's column names to col_1 ~ col_5.**

In [None]:
new_name = ['col_' + str(i+1) for i in range(5)]
iris_dataframe = iris_dataframe.toDF(*new_name)

**21. Convert the type of col_1 ~ col_4 of iris_dataframe to float.**