### Spark on Google Colab

In [0]:
!ls


Oscars.csv   spark-2.3.4-bin-hadoop2.7	    spark-warehouse
sample_data  spark-2.3.4-bin-hadoop2.7.tgz


In [0]:
#Install JAVA, SPARK, HADOOP
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-eu.apache.org/dist/spark/spark-2.3.4/spark-2.3.4-bin-hadoop2.7.tgz
!tar xf spark-2.3.4-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
#establish Spark session
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.4-bin-hadoop2.7"

import findspark
findspark.init("spark-2.3.4-bin-hadoop2.7")

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [0]:
# import Python modules
import pandas as pd
import numpy as np
import datetime as dt

In [0]:
# Upload data files from local or web
from google.colab import files
files.upload()

Saving Oscars.csv to Oscars.csv


{'Oscars.csv': b'_unit_id,_golden,_unit_state,_trusted_judgments,_last_judgment_at,birthplace,birthplace:confidence,date_of_birth,date_of_birth:confidence,race_ethnicity,race_ethnicity:confidence,religion,religion:confidence,sexual_orientation,sexual_orientation:confidence,year_of_award,year_of_award:confidence,award,biourl,birthplace_gold,date_of_birth_gold,movie,person,race_ethnicity_gold,religion_gold,sexual_orientation_gold,year_of_award_gold\r670454353,FALSE,finalized,3,2/10/15 3:45,"Chisinau, Moldova",1,30-Sep-1895,1,White,1,Na,1,Straight,1,1927,1,Best Director,http://www.nndb.com/people/320/000043191/,,,Two Arabian Knights,Lewis Milestone,,,,\r670454354,FALSE,finalized,3,2/10/15 2:03,"Glasgow, Scotland",1,2-Feb-1886,1,White,1,Na,1,Straight,0.6842,1930,1,Best Director,http://www.nndb.com/people/626/000042500/,,,The Divine Lady,Frank Lloyd,,,,\r670454355,FALSE,finalized,3,2/10/15 2:05,"Chisinau, Moldova",1,30-Sep-1895,1,White,1,Na,1,Straight,1,1931,0.6667,Best Director,http://www.

In [0]:
df = spark.createDataFrame([{"hello": "world"} for x in range(10000)])
df.show(3)



+-----+
|hello|
+-----+
|world|
|world|
|world|
+-----+
only showing top 3 rows



In [0]:
# Sample Data
file_loc = './sample_data/california_housing_train.csv'
df = spark.read.csv(file_loc, inferSchema=True, header=True)
print(type(df))

<class 'pyspark.sql.dataframe.DataFrame'>


In [0]:
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
+---------+--------+----

In [0]:
df.schema

StructType(List(StructField(longitude,DoubleType,true),StructField(latitude,DoubleType,true),StructField(housing_median_age,DoubleType,true),StructField(total_rooms,DoubleType,true),StructField(total_bedrooms,DoubleType,true),StructField(population,DoubleType,true),StructField(households,DoubleType,true),StructField(median_income,DoubleType,true),StructField(median_house_value,DoubleType,true)))

In [0]:
df.dtypes

[('longitude', 'double'),
 ('latitude', 'double'),
 ('housing_median_age', 'double'),
 ('total_rooms', 'double'),
 ('total_bedrooms', 'double'),
 ('population', 'double'),
 ('households', 'double'),
 ('median_income', 'double'),
 ('median_house_value', 'double')]

In [0]:
df.count()

17000

In [0]:
df.select("longitude","households","median_income").show(10)

+---------+----------+-------------+
|longitude|households|median_income|
+---------+----------+-------------+
|  -114.31|     472.0|       1.4936|
|  -114.47|     463.0|         1.82|
|  -114.56|     117.0|       1.6509|
|  -114.57|     226.0|       3.1917|
|  -114.57|     262.0|        1.925|
|  -114.58|     239.0|       3.3438|
|  -114.58|     633.0|       2.6768|
|  -114.59|     158.0|       1.7083|
|  -114.59|    1056.0|       2.1782|
|   -114.6|     271.0|       2.1908|
+---------+----------+-------------+
only showing top 10 rows



In [0]:
df.filter(df['households'] <200).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
|  -114.59|   34.83|              41.0|      812.0|         168.0|     375.0|     158.0|       1.7083|           48500.0|
|  -114.65|   32.79|              21.0|       44.0|          33.0|      64.0|      27.0|       0.8571|           25000.0|
|  -114.67|   33.92|              17.0|       97.0|          24.0|      29.0|      15.0|       1.2656|           27500.0|
|  -114.73|   33.43|              24.0|      796.0|         243.0|     227.0|     139.0|       0.8964|           59200.0|
|  -114.94|   34.55|    

In [0]:
df.groupby('longitude').count().show(10)

+---------+-----+
|longitude|count|
+---------+-----+
|  -119.04|   17|
|  -121.27|   41|
|  -122.61|   11|
|  -116.06|    2|
|  -117.55|   14|
|  -119.07|    9|
|   -120.3|    5|
|  -120.88|    7|
|  -121.22|   27|
|  -122.02|   61|
+---------+-----+
only showing top 10 rows



In [0]:
df.filter(df["population"].isin(227.0)).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.73|   33.43|              24.0|      796.0|         243.0|     227.0|     139.0|       0.8964|           59200.0|
|  -118.14|   34.15|              52.0|      407.0|         160.0|     227.0|     148.0|       1.5156|          187500.0|
|  -121.11|   37.43|              42.0|      412.0|          75.0|     227.0|      75.0|          2.5|           74200.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+



In [0]:
df2=df.filter(df["population"].isin(227.0))

In [0]:
df2.count()

3

In [0]:
df.filter(df["median_income"].between(2, 24)).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|
|  -114.58|   33.63|              29.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|           82400.0|
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   34.83|              46.0|     1497.0|         309.0|     787.0|     271.0|       2.1908|           48100.0|
|   -114.6|   33.62|    

In [0]:
df.select("longitude","latitude",df["total_rooms"]+100).show(10) # when applying transformation use df["col"]

+---------+--------+-------------------+
|longitude|latitude|(total_rooms + 100)|
+---------+--------+-------------------+
|  -114.31|   34.19|             5712.0|
|  -114.47|    34.4|             7750.0|
|  -114.56|   33.69|              820.0|
|  -114.57|   33.64|             1601.0|
|  -114.57|   33.57|             1554.0|
|  -114.58|   33.63|             1487.0|
|  -114.58|   33.61|             3007.0|
|  -114.59|   34.83|              912.0|
|  -114.59|   33.61|             4889.0|
|   -114.6|   34.83|             1597.0|
+---------+--------+-------------------+
only showing top 10 rows



In [0]:
df.select("longitude","latitude","total_rooms").show(10)

+---------+--------+-----------+
|longitude|latitude|total_rooms|
+---------+--------+-----------+
|  -114.31|   34.19|     5612.0|
|  -114.47|    34.4|     7650.0|
|  -114.56|   33.69|      720.0|
|  -114.57|   33.64|     1501.0|
|  -114.57|   33.57|     1454.0|
|  -114.58|   33.63|     1387.0|
|  -114.58|   33.61|     2907.0|
|  -114.59|   34.83|      812.0|
|  -114.59|   33.61|     4789.0|
|   -114.6|   34.83|     1497.0|
+---------+--------+-----------+
only showing top 10 rows



In [0]:
from pyspark.sql import functions as F
df.select("*",F.when(df["median_income"] > 2, 1).otherwise(0).alias("inc_cat")).show(10) 



+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|inc_cat|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|      0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|      0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|      0|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|      1|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|  

In [0]:
df.describe().show()

+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|      total_rooms|   total_bedrooms|        population|       households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+-----------------+-----------------+------------------+-----------------+------------------+------------------+
|  count|              17000|             17000|             17000|            17000|            17000|             17000|            17000|             17000|             17000|
|   mean|-119.56210823529375|  35.6252247058827| 28.58935294117647|2643.664411764706|539.4108235294118|1429.5739411764705|501.2219411764706| 3.883578100000021|207300.91235294117|
| stddev| 2.0051664084260357|2.1373397946570867|12.586936981660406|2179.947071452777|421.4994515798648| 1

In [0]:
stat=df.describe()
stat


DataFrame[summary: string, longitude: string, latitude: string, housing_median_age: string, total_rooms: string, total_bedrooms: string, population: string, households: string, median_income: string, median_house_value: string]

In [0]:
stat.columns

['summary',
 'longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [0]:
stat.head(3)

[Row(summary='count', longitude='17000', latitude='17000', housing_median_age='17000', total_rooms='17000', total_bedrooms='17000', population='17000', households='17000', median_income='17000', median_house_value='17000'),
 Row(summary='mean', longitude='-119.56210823529375', latitude='35.6252247058827', housing_median_age='28.58935294117647', total_rooms='2643.664411764706', total_bedrooms='539.4108235294118', population='1429.5739411764705', households='501.2219411764706', median_income='3.883578100000021', median_house_value='207300.91235294117'),
 Row(summary='stddev', longitude='2.0051664084260357', latitude='2.1373397946570867', housing_median_age='12.586936981660406', total_rooms='2179.947071452777', total_bedrooms='421.4994515798648', population='1147.852959159527', households='384.5208408559016', median_income='1.9081565183791036', median_house_value='115983.76438720895')]

In [0]:
stat[2]

Column<b'latitude'>

In [0]:
#add new column
df = df.withColumn("New",df["median_income"]/df["median_house_value"])

In [0]:
df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value',
 'New']

In [0]:
df=df.drop("New")

In [0]:
df.columns

['longitude',
 'latitude',
 'housing_median_age',
 'total_rooms',
 'total_bedrooms',
 'population',
 'households',
 'median_income',
 'median_house_value']

In [0]:
df.groupby('longitude').mean("median_income").show(10)

+---------+------------------+
|longitude|avg(median_income)|
+---------+------------------+
|  -119.04| 2.992947058823529|
|  -121.27| 2.976824390243902|
|  -122.61|4.4254727272727274|
|  -116.06|           1.79185|
|  -117.55|            4.3996|
|  -119.07| 3.374211111111111|
|   -120.3|2.7403799999999996|
|  -120.88| 3.538828571428571|
|  -121.22|3.3738740740740734|
|  -122.02| 5.192816393442622|
+---------+------------------+
only showing top 10 rows



In [0]:
df_sorted=df.sort("total_rooms", ascending=False) 

In [0]:
df_sorted.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -117.74|   33.89|               4.0|    37937.0|        5471.0|   16122.0|    5189.0|       7.4947|          366300.0|
|  -121.79|   36.64|              11.0|    32627.0|        6445.0|   28566.0|    6082.0|       2.3087|          118800.0|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|
|  -118.78|   34.16|               9.0|    30405.0|        4093.0|   12873.0|    3931.0|       8.0137|          399200.0|
|  -117.12|   33.52|               4.0|    30401.0|        4957.0|   13251.0|    4339.0|       4.5841|          212300.0|
|  -121.92|   37.53|    

In [0]:
df_ordered=df.orderBy("total_rooms",ascending=False)

In [0]:
df_ordered.show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -117.74|   33.89|               4.0|    37937.0|        5471.0|   16122.0|    5189.0|       7.4947|          366300.0|
|  -121.79|   36.64|              11.0|    32627.0|        6445.0|   28566.0|    6082.0|       2.3087|          118800.0|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|
|  -118.78|   34.16|               9.0|    30405.0|        4093.0|   12873.0|    3931.0|       8.0137|          399200.0|
|  -117.12|   33.52|               4.0|    30401.0|        4957.0|   13251.0|    4339.0|       4.5841|          212300.0|
|  -121.92|   37.53|    

In [0]:
#from pyspark.sql import functions as F
#df.withColumn('Room_cat', F.when(df["median_income"] > 1, 1).when(df["median_income"]>5,2).otherwise(0)).show(10)
#OR
df.select("*", F.when(df["median_income"] > 1, 1).when(df["median_income"]>5,2).otherwise(0).alias("Room_cat")).show(10)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|Room_cat|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+--------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|       1|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|       1|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|       1|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0|       1|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     2

In [0]:
df=df.withColumn('Room_cat', F.when(df["median_income"] > 5, 3).when(df["median_income"]>3,2).otherwise(1))
df.groupby("Room_cat").count().orderBy("Room_cat").show()

+--------+-----+
|Room_cat|count|
+--------+-----+
|       1| 6077|
|       2| 7171|
|       3| 3752|
+--------+-----+



In [0]:
#output
df.select("*").write.save("CA_Housing.json",format="json")

### Movies data


In [0]:
movies = spark.read.csv('./Oscars.csv', inferSchema=True, header=True)


In [0]:
movies.createOrReplaceTempView("oscar2")
spark.sql("SELECT distinct _unit_state, count(*) as FREQ FROM oscar2 WHERE year_of_award>1900 GROUP BY _unit_state").show(10)

+-----------+----+
|_unit_state|FREQ|
+-----------+----+
|     golden|  25|
|  finalized| 416|
+-----------+----+



In [0]:
df2=spark.sql("SELECT * FROM oscar2 WHERE year_of_award>1900")
#spark.sql("CREATE TABLE df2 AS SELECT * FROM oscar2 WHERE year_of_award>1900")

In [0]:
#df2.createOrReplaceTempView("df2v") #you have to build view for each dataframe to use spark SQL
spark.sql("SELECT * FROM df2 WHERE year_of_award>1903").show(10)


In [0]:
#save to CSV
movies.write.csv("movies_new.csv")
!ls movies_new.csv

part-00000-58ab17ad-5634-4d90-b4ba-be3a2ebf06c0-c000.csv  _SUCCESS


In [0]:
vlist=movies.columns
vlist[7]

'date_of_birth'

In [0]:
#from pyspark.sql.types import DateType
#movies = movies.withColumn("BDT", movies[vlist[7]].cast(DateType()))

In [0]:
from pyspark.sql import functions as F
movies=movies.select("*",F.to_date("date_of_birth", "dd-MMM-yyyy").alias("BDT"))
#alternaive not tested
#from pyspark.sql.functions import expr
#df.withColumn("test3",expr("to_date(value, format)")).show()

In [0]:
spark.sql("SELECT date_of_birth, to_date(date_of_birth, 'dd-MMM-yyyy') as BDT FROM oscar2").show(10)

+-------------+----------+
|date_of_birth|       BDT|
+-------------+----------+
|  30-Sep-1895|1895-09-30|
|   2-Feb-1886|1886-02-02|
|  30-Sep-1895|1895-09-30|
|  23-Feb-1899|1899-02-23|
|  23-Apr-1894|1894-04-23|
|   2-Feb-1886|1886-02-02|
|  18-May-1897|1897-05-18|
|   1-Feb-1894|1894-02-01|
|  18-May-1897|1897-05-18|
|   3-Oct-1898|1898-10-03|
+-------------+----------+
only showing top 10 rows



In [0]:
spark.sql("SELECT min(year(to_date(date_of_birth, 'd-MMM-yyyy'))) as min_y, \
           max(year(to_date(date_of_birth, 'd-MMM-yyyy'))) as max_y FROM oscar2").show()          

+-----+-----+
|min_y|max_y|
+-----+-----+
|    4| 1936|
+-----+-----+



In [0]:
spark.sql("SELECT date_of_birth \
           FROM oscar2 WHERE year(to_date(date_of_birth, 'dd-MMM-yyyy'))<=99").show(10)          

+-------------+
|date_of_birth|
+-------------+
|    11-Oct-18|
|    22-Jun-06|
|     7-Sep-09|
|     5-Aug-06|
|    11-Feb-09|
|    11-Feb-09|
|    18-Dec-04|
|    29-Apr-07|
|     7-Sep-09|
|    30-Jan-20|
+-------------+
only showing top 10 rows



In [0]:
spark.sql("SELECT count(*) \
           FROM oscar2 WHERE year(to_date(date_of_birth, 'dd-MMM-yyyy'))<=100").show(10)          

+--------+
|count(1)|
+--------+
|     361|
+--------+



In [0]:
spark.sql("SELECT date_of_birth, CASE WHEN year(to_date(date_of_birth, 'dd-MMM-yyyy')) < 100 THEN year(to_date(date_of_birth, 'dd-MMM-yyyy'))+1900 \
           ELSE year(to_date(date_of_birth, 'dd-MMM-yyyy')) END as YR FROM oscar2 WHERE year(to_date(date_of_birth, 'dd-MMM-yyyy'))=4").show(10)          

+-------------+----+
|date_of_birth|  YR|
+-------------+----+
|    18-Dec-04|1904|
|    18-Dec-04|1904|
|    14-Apr-04|1904|
|    31-May-04|1904|
|    29-Sep-04|1904|
+-------------+----+



In [0]:
movies2=spark.sql("SELECT *, CASE WHEN year(to_date(date_of_birth, 'dd-MMM-yyyy')) < 100 \
           THEN year(to_date(date_of_birth, 'dd-MMM-yyyy'))+1900 \
           ELSE year(to_date(date_of_birth, 'dd-MMM-yyyy')) END as YR FROM oscar2")          

In [0]:
movies2.dtypes

[('_unit_id', 'int'),
 ('_golden', 'boolean'),
 ('_unit_state', 'string'),
 ('_trusted_judgments', 'int'),
 ('_last_judgment_at', 'string'),
 ('birthplace', 'string'),
 ('birthplace:confidence', 'double'),
 ('date_of_birth', 'string'),
 ('date_of_birth:confidence', 'double'),
 ('race_ethnicity', 'string'),
 ('race_ethnicity:confidence', 'double'),
 ('religion', 'string'),
 ('religion:confidence', 'double'),
 ('sexual_orientation', 'string'),
 ('sexual_orientation:confidence', 'double'),
 ('year_of_award', 'int'),
 ('year_of_award:confidence', 'double'),
 ('award', 'string'),
 ('biourl', 'string'),
 ('birthplace_gold', 'string'),
 ('date_of_birth_gold', 'string'),
 ('movie', 'string'),
 ('person', 'string'),
 ('race_ethnicity_gold', 'string'),
 ('religion_gold', 'string'),
 ('sexual_orientation_gold', 'string'),
 ('year_of_award_gold', 'int'),
 ('YR', 'int')]

In [0]:
movies[vlist[7],"BDT"].show(10)

NameError: ignored

In [0]:
movies=movies.withColumn("recency",F.when(F.year(movies["BDT"])>1884,1).otherwise(0))

In [0]:
movies.groupBy("recency").count().show()

+-------+-----+
|recency|count|
+-------+-----+
|      1|   67|
|      0|  374|
+-------+-----+



In [0]:
#change data type
#from pyspark.sql.types import DoubleType
#movies = movies.withColumn("vote_avg_num", movies["vote_average"].cast(DoubleType()))


In [0]:
len(movies.columns)

29

In [0]:
vlist=movies.columns
vlist[-2:]

['BDT', 'recency']

In [0]:
movies.groupBy(F.year("BDT")).count().orderBy(F.year("BDT"), ascending=False).show(5)

+---------+-----+
|year(BDT)|count|
+---------+-----+
|     1936|    1|
|     1903|    6|
|     1902|    5|
|     1901|    8|
|     1900|    5|
+---------+-----+
only showing top 5 rows



In [0]:
#movies.select([count(when(isnan(c), c)) for c in movies.columns]).show()
for c in vlist:
  K=movies.filter(movies[c].isNull()).count()
  print(c, K)

_unit_id 0
_golden 0
_unit_state 0
_trusted_judgments 0
_last_judgment_at 25
birthplace 0
birthplace:confidence 0
date_of_birth 0
date_of_birth:confidence 0
race_ethnicity 0
race_ethnicity:confidence 0
religion 0
religion:confidence 0
sexual_orientation 0
sexual_orientation:confidence 0
year_of_award 0
year_of_award:confidence 0
award 0
biourl 0
birthplace_gold 432
date_of_birth_gold 433
movie 0
person 0
race_ethnicity_gold 439
religion_gold 431
sexual_orientation_gold 438
year_of_award_gold 430
BDT 1
recency 0


In [0]:
#movies.where(movies["race_ethnicity_gold"].isNull()).select("*").show(5)
#OR
#movies.select("*").filter(movies["race_ethnicity_gold"].isNull()).show(5)
#OR 
movies.filter(movies["race_ethnicity_gold"].isNull()).select("*").show(5)

+---------+-------+-----------+------------------+-----------------+------------------+---------------------+-------------+------------------------+--------------+-------------------------+--------------+-------------------+------------------+-----------------------------+-------------+------------------------+-------------+--------------------+---------------+------------------+--------------------+---------------+-------------------+-------------+-----------------------+------------------+----------+-------+
| _unit_id|_golden|_unit_state|_trusted_judgments|_last_judgment_at|        birthplace|birthplace:confidence|date_of_birth|date_of_birth:confidence|race_ethnicity|race_ethnicity:confidence|      religion|religion:confidence|sexual_orientation|sexual_orientation:confidence|year_of_award|year_of_award:confidence|        award|              biourl|birthplace_gold|date_of_birth_gold|               movie|         person|race_ethnicity_gold|religion_gold|sexual_orientation_gold|year_of

In [0]:
movies.schema
#["title"]
#fields = [StructField("title", StringType(), True)]
#schema = StructType(fields)


StructType(List(StructField(_unit_id,IntegerType,true),StructField(_golden,BooleanType,true),StructField(_unit_state,StringType,true),StructField(_trusted_judgments,IntegerType,true),StructField(_last_judgment_at,StringType,true),StructField(birthplace,StringType,true),StructField(birthplace:confidence,DoubleType,true),StructField(date_of_birth,StringType,true),StructField(date_of_birth:confidence,DoubleType,true),StructField(race_ethnicity,StringType,true),StructField(race_ethnicity:confidence,DoubleType,true),StructField(religion,StringType,true),StructField(religion:confidence,DoubleType,true),StructField(sexual_orientation,StringType,true),StructField(sexual_orientation:confidence,DoubleType,true),StructField(year_of_award,IntegerType,true),StructField(year_of_award:confidence,DoubleType,true),StructField(award,StringType,true),StructField(biourl,StringType,true),StructField(birthplace_gold,StringType,true),StructField(date_of_birth_gold,StringType,true),StructField(movie,StringTyp

In [0]:
#import os
#from pyspark import SparkConf

#SUBMIT_ARGS = "--packages com.databricks:spark-csv_2.11:1.2.0 pyspark-shell"
#os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
#conf = SparkConf()

#from pyspark import SparkContext
#sc = SparkContext(conf=conf)

In [0]:
movies.select("_unit_state").distinct().show()

+-----------+
|_unit_state|
+-----------+
|     golden|
|  finalized|
+-----------+



In [0]:
movies.groupBy("_unit_state").mean("religion:confidence").show()

+-----------+------------------------+
|_unit_state|avg(religion:confidence)|
+-----------+------------------------+
|     golden|      0.9954040000000001|
|  finalized|      0.9890108173076926|
+-----------+------------------------+



In [0]:
#movies=movies.withColumn("BYR",F.year('BDT'))
#movies=movies.withColumn("BYR_New", F.when(movies['BYR']<1000,1900+movies["BYR"]).otherwse(movies["BYR"]))
movies.dtypes

[('_unit_id', 'int'),
 ('_golden', 'boolean'),
 ('_unit_state', 'string'),
 ('_trusted_judgments', 'int'),
 ('_last_judgment_at', 'string'),
 ('birthplace', 'string'),
 ('birthplace:confidence', 'double'),
 ('date_of_birth', 'string'),
 ('date_of_birth:confidence', 'double'),
 ('race_ethnicity', 'string'),
 ('race_ethnicity:confidence', 'double'),
 ('religion', 'string'),
 ('religion:confidence', 'double'),
 ('sexual_orientation', 'string'),
 ('sexual_orientation:confidence', 'double'),
 ('year_of_award', 'int'),
 ('year_of_award:confidence', 'double'),
 ('award', 'string'),
 ('biourl', 'string'),
 ('birthplace_gold', 'string'),
 ('date_of_birth_gold', 'string'),
 ('movie', 'string'),
 ('person', 'string'),
 ('race_ethnicity_gold', 'string'),
 ('religion_gold', 'string'),
 ('sexual_orientation_gold', 'string'),
 ('year_of_award_gold', 'int'),
 ('BDT', 'date'),
 ('recency', 'int'),
 ('Age', 'int'),
 ('BYR', 'int'),
 ('BYR_New', 'int')]

In [0]:
movies.where(movies[])select('BYR_New','BYR',F.year(movies['BDT'])).filter(show(10)

+-------+----+---------+
|BYR_New| BYR|year(BDT)|
+-------+----+---------+
|   null|1895|     1895|
|   null|1886|     1886|
|   null|1895|     1895|
|   null|1899|     1899|
|   null|1894|     1894|
|   null|1886|     1886|
|   null|1897|     1897|
|   null|1894|     1894|
|   null|1897|     1897|
|   null|1898|     1898|
+-------+----+---------+
only showing top 10 rows



In [0]:
movies=movies.withColumn("Age", movies["year_of_award"]-F.year(movies["BDT"]))

In [0]:
movies.select("Age","year_of_award", "BDT").show(10)

+---+-------------+----------+
|Age|year_of_award|       BDT|
+---+-------------+----------+
| 32|         1927|1895-09-30|
| 44|         1930|1886-02-02|
| 36|         1931|1895-09-30|
| 33|         1932|1899-02-23|
| 39|         1933|1894-04-23|
| 48|         1934|1886-02-02|
| 38|         1935|1897-05-18|
| 42|         1936|1894-02-01|
| 40|         1937|1897-05-18|
| 40|         1938|1898-10-03|
+---+-------------+----------+
only showing top 10 rows



In [0]:
K=-1
for c in movies.columns:
  K+=1
  print(K, c)


0 _unit_id
1 _golden
2 _unit_state
3 _trusted_judgments
4 _last_judgment_at
5 birthplace
6 birthplace:confidence
7 date_of_birth
8 date_of_birth:confidence
9 race_ethnicity
10 race_ethnicity:confidence
11 religion
12 religion:confidence
13 sexual_orientation
14 sexual_orientation:confidence
15 year_of_award
16 year_of_award:confidence
17 award
18 biourl
19 birthplace_gold
20 date_of_birth_gold
21 movie
22 person
23 race_ethnicity_gold
24 religion_gold
25 sexual_orientation_gold
26 year_of_award_gold
27 BDT
28 recency
29 Age


In [0]:
movies.groupBy("religion").mean("Age").show()

+--------------------+------------------+
|            religion|          avg(Age)|
+--------------------+------------------+
|            Agnostic|            1978.0|
|   Christian Science|            1931.5|
|              Quaker|            1952.5|
|             Baptist|            1938.8|
|           Christian|             989.0|
|      Roman Catholic| 1382.032786885246|
|            Lutheran|1946.6666666666667|
|   Congregationalist|            1933.0|
|Anglican/episcopa...|            1000.0|
|            Buddhist|            1931.0|
|               Deist|            1939.0|
|Born-Again Christian|            1938.4|
| Disciples of Christ|            1940.0|
|          Protestant|            1565.2|
|              Jewish|           1792.32|
|                  Na|1561.8320895522388|
|               Hindu|            1934.0|
|              Sufism|            1943.0|
|           Methodist|            1927.0|
|        Presbyterian|            1934.5|
+--------------------+------------

In [0]:
movies.describe().show()

+-------+-------------------+-----------+------------------+-----------------+--------------------+---------------------+-------------+------------------------+--------------+-------------------------+--------+-------------------+------------------+-----------------------------+------------------+------------------------+--------------------+--------------------+--------------------+------------------+----------------+------------+-------------------+--------------------+-----------------------+------------------+-------------------+-----------------+
|summary|           _unit_id|_unit_state|_trusted_judgments|_last_judgment_at|          birthplace|birthplace:confidence|date_of_birth|date_of_birth:confidence|race_ethnicity|race_ethnicity:confidence|religion|religion:confidence|sexual_orientation|sexual_orientation:confidence|     year_of_award|year_of_award:confidence|               award|              biourl|     birthplace_gold|date_of_birth_gold|           movie|      person|race_et

In [0]:
movies.select("_unit_id","date_of_birth","BDT","Age","date_of_birth").filter(movies["Age"]>100).show(10)

+---------+-------------+----------+----+-------------+
| _unit_id|date_of_birth|       BDT| Age|date_of_birth|
+---------+-------------+----------+----+-------------+
|670454370|    11-Oct-18|0018-10-11|1944|    11-Oct-18|
|670454371|    22-Jun-06|0006-06-22|1940|    22-Jun-06|
|670454373|     7-Sep-09|0009-09-07|1939|     7-Sep-09|
|670454374|     5-Aug-06|0006-08-05|1943|     5-Aug-06|
|670454375|    11-Feb-09|0009-02-11|1941|    11-Feb-09|
|670454376|    11-Feb-09|0009-02-11|1942|    11-Feb-09|
|670454377|    18-Dec-04|0004-12-18|1948|    18-Dec-04|
|670454379|    29-Apr-07|0007-04-29|1947|    29-Apr-07|
|670454380|     7-Sep-09|0009-09-07|1946|     7-Sep-09|
|670454381|    30-Jan-20|0020-01-30|1936|    30-Jan-20|
+---------+-------------+----------+----+-------------+
only showing top 10 rows



In [0]:
movies.select("_unit_id").distinct().count()

441

In [0]:
movies.count()

441

In [0]:
31000/172000

0.18023255813953487