In [2]:
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
import sys
import time

In [3]:
master = "local"
if len(sys.argv) == 2:
    master = sys.argv[1]
# 创建SparkSession
spark = SparkSession.builder.master(master).appName("BasicUsage").getOrCreate()
spark

In [5]:
# 创建SparkContext
sc = spark.sparkContext
sc

In [6]:
# 创建SQLContext
sqlContext = SQLContext(sc)
sqlContext

<pyspark.sql.context.SQLContext at 0x22c835fe108>

In [7]:
sc.defaultParallelism  # 默认并行度，使用几个CPU核

1

In [8]:
sc.getConf()  # 获取当前SparkConf

<pyspark.conf.SparkConf at 0x22c83f88948>

In [15]:
sc.getConf().toDebugString()

'spark.app.id=local-1608617241057\nspark.app.name=BasicUsage\nspark.driver.host=DESKTOP-4QACQ8P\nspark.driver.port=57329\nspark.executor.id=driver\nspark.master=local\nspark.rdd.compress=True\nspark.serializer.objectStreamReset=100\nspark.submit.deployMode=client\nspark.ui.showConsoleProgress=true'

In [14]:
sc.getConf().contains("spark.app.id")

True

In [73]:
# 数据文件路径
file_path = "../data/movies.csv"

### 设置将读取文件的schema结构

In [57]:
# 读取数据
from pyspark.sql.types import *
movies_schema = StructType({
    StructField("movie_id",IntegerType(),nullable=False),
    StructField("movie_name",StringType(),nullable=True),
    StructField("movie_categray",StringType(),nullable=True)
})
# <font color="red">读dat文件时可以对':'分隔符进行分隔，但是对'"::'就不可以</font>
# movies_df = spark.read.csv(file_path,sep='::',schema=movies_schema)  # 出错
# movies_df.show(5)

### 读取csv文件(第一行为列名/第一行不作为列名)

In [88]:
# movies_df = spark.read.csv(file_path,schema=movies_schema)  # 这种可以csv文件中第一行没有写列名的文件进行读取。
# movies_df = spark.read.format("csv").option("header","true").load(file_path,schema=movies_schema) # 这种是先用第一行作为列名之后，再将设置的schema作为列名进行更改。
movies_df = spark.read.format("csv").option("header","true").load(file_path) # 这种直接将csv第一行作为列名,这样读出来的数据自动将所有列作为String类型
movies_df.show(5)
movies_df

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



DataFrame[movieId: string, title: string, genres: string]

In [92]:
import pandas as pd
movies_df.limit(10).toPandas()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


In [93]:
movies_df

DataFrame[movieId: string, title: string, genres: string]

## 利用movies_df来创建视图，进行sql查询

In [95]:
movies_df.createOrReplaceTempView("movies")

#### 利用SparkSession进行失sql查询

In [100]:
movie_limit = spark.sql("select * from movies limit 5")
movie_limit.show()
# movie_limit.show(2)
movie_limit # DataFrame[movieId: string, title: string, genres: string]

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



DataFrame[movieId: string, title: string, genres: string]

#### 上面创建的视图与当前的sparksession是处于同一个会话中的。
#### <font color="red">注意如果换成了一个新的session，上面所创建的视图在新的会话中不可见</font>

In [101]:
spark.newSession().sql("select * from movies limit 5")  # 会报错，因为找不到movies这个视图

AnalysisException: 'Table or view not found: movies; line 1 pos 14'

In [102]:
# 但是之前的sparksession仍然能够访问
spark.sql("select * from movies limit 5").show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



### 创建全局视图，为新的session提供同样的视图

In [103]:
movies_df.createOrReplaceGlobalTempView("movies_global")

#### <font color="red">对于全局视图，在查询的过程中视图名称前应该加上**global_temp.**才可以，否则报错</font>

In [105]:
spark.sql("select * from global_temp.movies_global limit 5").show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [106]:
spark.newSession().sql("select * from global_temp.movies_global limit 5").show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [107]:
spark.newSession()

## DSL语法

DSL语法风格不必去创建视图读取数据

In [109]:
movies_df.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



## 建议使用第二种查询方法

在查询的时候，可以直接写列名，也可以写成movies_df["列名"]，但是前者无法在filter和在查询的时候更新值的操作，但是后者可以。所以以后还是经常使用后者来进行查询等操作吧。

In [133]:
movies_df.select("title").show(2)
movies_df.select(movies_df['genres']).show(2)

+----------------+
|           title|
+----------------+
|Toy Story (1995)|
|  Jumanji (1995)|
+----------------+
only showing top 2 rows

+--------------------+
|              genres|
+--------------------+
|Adventure|Animati...|
|Adventure|Childre...|
+--------------------+
only showing top 2 rows



In [131]:
movies_df.select(movies_df['movieId'] + 1).show()

+-------------+
|(movieId + 1)|
+-------------+
|          2.0|
|          3.0|
|          4.0|
|          5.0|
|          6.0|
|          7.0|
|          8.0|
|          9.0|
|         10.0|
|         11.0|
|         12.0|
|         13.0|
|         14.0|
|         15.0|
|         16.0|
|         17.0|
|         18.0|
|         19.0|
|         20.0|
|         21.0|
+-------------+
only showing top 20 rows



In [142]:
# movies_df.filter("movieId">10)  # 这样写是错误的
movies_df.filter(movies_df["movieId"]>10).show(2)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
+-------+--------------------+--------------------+
only showing top 2 rows



In [147]:
movies_df.count()

9125

In [149]:
movies_df.filter(movies_df["title"].startswith("T")).show()
movies_df.filter(movies_df["title"].startswith("T")).count()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      8| Tom and Huck (1995)|  Adventure|Children|
|     32|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|
|     45|   To Die For (1995)|Comedy|Drama|Thri...|
|     64|Two if by Sea (1996)|      Comedy|Romance|
|     81|Things to Do in D...| Crime|Drama|Romance|
|    111|  Taxi Driver (1976)|Crime|Drama|Thriller|
|    200|Tie That Binds, T...|            Thriller|
|    201| Three Wishes (1995)|       Drama|Fantasy|
|    202|Total Eclipse (1995)|       Drama|Romance|
|    203|To Wong Foo, Than...|              Comedy|
|    245|The Glass Shield ...|         Crime|Drama|
|    306|Three Colors: Red...|               Drama|
|    307|Three Colors: Blu...|               Drama|
|    308|Three Colors: Whi...|        Comedy|Drama|
|    326|To Live (Huozhe) ...|               Drama|
|    327|   

656