<a href="https://colab.research.google.com/github/Thisun1997/spark_test/blob/main/sparkTest.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
print(os.listdir('./sample_data'))

['anscombe.json', 'README.md', 'mnist_test.csv', 'california_housing_test.csv', 'california_housing_train.csv', 'mnist_train_small.csv']


In [None]:
import findspark
findspark.init()

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

spark = SparkSession \
    .builder \
    .getOrCreate()

In [None]:
rdd = sc.textFile('/content/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/TestData/data.txt')

In [None]:
rdd.collect()

['1999\t1999', '2000\t2000', '2001\t2001', '2010\t2010', '2018\t2018']

In [None]:
df = rdd.map(lambda x: x.split("\t")).toDF(["TEST","TEST_PART"])

In [None]:
df.show()

+----+---------+
|TEST|TEST_PART|
+----+---------+
|1999|     1999|
|2000|     2000|
|2001|     2001|
|2010|     2010|
|2018|     2018|
+----+---------+



In [None]:
df.printSchema()

root
 |-- TEST: string (nullable = true)
 |-- TEST_PART: string (nullable = true)



In [None]:
from pyspark.sql.types import DateType
df1 = df.withColumn("TEST_COL", df['TEST'].cast('long')).drop('TEST').withColumn('TEST_PART_COL', df['TEST_PART'].cast('long')).drop('TEST_PART')
df1.show()
print(df1.schema)

+--------+-------------+
|TEST_COL|TEST_PART_COL|
+--------+-------------+
|    1999|         1999|
|    2000|         2000|
|    2001|         2001|
|    2010|         2010|
|    2018|         2018|
+--------+-------------+

StructType(List(StructField(TEST_COL,LongType,true),StructField(TEST_PART_COL,LongType,true)))


In [None]:
df1.printSchema()

root
 |-- TEST_COL: long (nullable = true)
 |-- TEST_PART_COL: long (nullable = true)



In [None]:
df1.write.format("parquet").partitionBy("TEST_PART_COL").option("path", "/content/spark-2.4.7-bin-hadoop2.7/python/test_support/sql/TestData/parquet").saveAsTable("TEST_TABLE")


In [None]:
spark.sql("SELECT * FROM TEST_TABLE WHERE TEST_PART_COL = TEST_COL AND (TEST_COL = 2001 OR TEST_COL = 1999)").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('TEST_PART_COL = 'TEST_COL) && (('TEST_COL = 2001) || ('TEST_COL = 1999)))
   +- 'UnresolvedRelation `TEST_TABLE`

== Analyzed Logical Plan ==
TEST_COL: bigint, TEST_PART_COL: bigint
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((TEST_PART_COL#54L = TEST_COL#53L) && ((TEST_COL#53L = cast(2001 as bigint)) || (TEST_COL#53L = cast(1999 as bigint))))
   +- SubqueryAlias `default`.`test_table`
      +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Optimized Logical Plan ==
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((((isnotnull(TEST_PART_COL#54L) && isnotnull(TEST_COL#53L)) && ((TEST_PART_COL#54L = 2001) || (TEST_PART_COL#54L = 1999))) && (TEST_PART_COL#54L = TEST_COL#53L)) && ((TEST_COL#53L = 2001) || (TEST_COL#53L = 1999)))
   +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Physical Plan ==
*(1) Project [TEST_COL#53L, TEST_PART_COL#54L]
+- *(1) Filter ((isnotnull(TEST_COL#53L) && (TEST_PART_COL#54L = TEST_

In [None]:
spark.sql("SELECT * FROM TEST_TABLE WHERE TEST_PART_COL = TEST_COL AND (TEST_COL > 1999 AND TEST_COL <= 2001)").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('TEST_PART_COL = 'TEST_COL) && (('TEST_COL > 1999) && ('TEST_COL <= 2001)))
   +- 'UnresolvedRelation `TEST_TABLE`

== Analyzed Logical Plan ==
TEST_COL: bigint, TEST_PART_COL: bigint
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((TEST_PART_COL#54L = TEST_COL#53L) && ((TEST_COL#53L > cast(1999 as bigint)) && (TEST_COL#53L <= cast(2001 as bigint))))
   +- SubqueryAlias `default`.`test_table`
      +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Optimized Logical Plan ==
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter (((((((TEST_PART_COL#54L > 1999) && isnotnull(TEST_PART_COL#54L)) && (TEST_PART_COL#54L <= 2001)) && isnotnull(TEST_COL#53L)) && (TEST_PART_COL#54L = TEST_COL#53L)) && (TEST_COL#53L > 1999)) && (TEST_COL#53L <= 2001))
   +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Physical Plan ==
*(1) Project [TEST_COL#53L, TEST_PART_COL#54L]
+- *(1) Filter (((isnotnull(TEST_COL#53L) && (TEST_PART_COL#54L = 

In [None]:
spark.sql("SELECT * FROM TEST_TABLE WHERE TEST_PART_COL = ABS(TEST_COL) AND (TEST_COL = 2001 OR TEST_COL = 1999)").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('TEST_PART_COL = 'ABS('TEST_COL)) && (('TEST_COL = 2001) || ('TEST_COL = 1999)))
   +- 'UnresolvedRelation `TEST_TABLE`

== Analyzed Logical Plan ==
TEST_COL: bigint, TEST_PART_COL: bigint
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((TEST_PART_COL#54L = abs(TEST_COL#53L)) && ((TEST_COL#53L = cast(2001 as bigint)) || (TEST_COL#53L = cast(1999 as bigint))))
   +- SubqueryAlias `default`.`test_table`
      +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Optimized Logical Plan ==
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter (((isnotnull(TEST_PART_COL#54L) && isnotnull(TEST_COL#53L)) && (TEST_PART_COL#54L = abs(TEST_COL#53L))) && ((TEST_COL#53L = 2001) || (TEST_COL#53L = 1999)))
   +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Physical Plan ==
*(1) Project [TEST_COL#53L, TEST_PART_COL#54L]
+- *(1) Filter ((isnotnull(TEST_COL#53L) && (TEST_PART_COL#54L = abs(TEST_COL#53L))) && ((TEST_COL#53L = 2001) || (TES

In [None]:
spark.sql("SELECT * FROM TEST_TABLE WHERE TEST_PART_COL = ABS(TEST_COL) AND (TEST_COL > 1999 AND TEST_COL <= 2001)").explain(True)

== Parsed Logical Plan ==
'Project [*]
+- 'Filter (('TEST_PART_COL = 'ABS('TEST_COL)) && (('TEST_COL > 1999) && ('TEST_COL <= 2001)))
   +- 'UnresolvedRelation `TEST_TABLE`

== Analyzed Logical Plan ==
TEST_COL: bigint, TEST_PART_COL: bigint
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((TEST_PART_COL#54L = abs(TEST_COL#53L)) && ((TEST_COL#53L > cast(1999 as bigint)) && (TEST_COL#53L <= cast(2001 as bigint))))
   +- SubqueryAlias `default`.`test_table`
      +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Optimized Logical Plan ==
Project [TEST_COL#53L, TEST_PART_COL#54L]
+- Filter ((((isnotnull(TEST_PART_COL#54L) && isnotnull(TEST_COL#53L)) && (TEST_PART_COL#54L = abs(TEST_COL#53L))) && (TEST_COL#53L > 1999)) && (TEST_COL#53L <= 2001))
   +- Relation[TEST_COL#53L,TEST_PART_COL#54L] parquet

== Physical Plan ==
*(1) Project [TEST_COL#53L, TEST_PART_COL#54L]
+- *(1) Filter (((isnotnull(TEST_COL#53L) && (TEST_PART_COL#54L = abs(TEST_COL#53L))) && (TEST_COL#53L > 1999)) && 