In [14]:
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster('local').setAppName('my app')
sc = SparkContext(conf=conf)
stringJSONRDD = sc.parallelize(("""
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue"
  }""")
)

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=my app, master=local) created by __init__ at <ipython-input-3-cf928bf41a47>:3 

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("Word Count").config("spark.some.config.option", "some-value").getOrCreate()
swimmersJSON = spark.read.json(stringJSONRDD)

In [15]:
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [16]:
# 创建临时表
swimmersJSON.createOrReplaceTempView('swimmersJsontable')

In [22]:
# 编写sql查询
spark.sql("select 'bro'||'wn' ").show()

+---------------+
|concat(bro, wn)|
+---------------+
|          brown|
+---------------+



In [9]:
spark.sql('select * from swimmersJsontable').show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [10]:
spark.sql('select * from swimmersJsontable').collect()  # 返回所有行   不建议使用

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='blue', id='345', name='Simone')]

### 使用反射来推断模式

In [11]:
swimmersJSON.printSchema()  # 打印出模式定义

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



#### 编程指定模式

In [13]:
fraud_train,fraud_test =swimmersJSON.randomSplit([0.7,0.3])

In [15]:
fraud_train.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 22|   green|234|Michael|
| 23|    blue|345| Simone|
+---+--------+---+-------+



In [16]:
fraud_test.show()

+---+--------+---+-----+
|age|eyeColor| id| name|
+---+--------+---+-----+
| 19|   brown|123|Katie|
+---+--------+---+-----+



In [5]:
from pyspark.sql.types import *
#生成以逗号分隔的数据
stringCSVRDD = sc.parallelize([(123, 'Katie', 19, 'brown'),( None, 'Michael', 22, 'grbeen'),(235, 'Michael', 22, 'grbeen'), (234, 'Michael', 22, 'green'), (345, 'Simone', 23, 'blue')])

In [57]:
###  StructField被分解为一下方面
# · Name 该字段的名字
# . dataType 该字段的数据类型
# . nullable 该字段的值是否为空
schema = StructType([
    StructField('id',LongType(),True),
    StructField('name',StringType(),True),
    StructField('age',LongType(),True),
    StructField('eyeColor',StringType(),True)
])


In [58]:
# 最后为我们的RDD应用该模式，并创建DataFrame
swimmers = spark.createDataFrame(data=stringCSVRDD,schema=schema)

In [59]:
# 利用dataframe创建一个临时视图
swimmers.createOrReplaceTempView('swimmers')

In [60]:
swimmers.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)



In [12]:
### 利用dataframeAPI查询
#  collect（） show() take()  show take包含了限制返回行数的选项
#行数
print(swimmers.count())
swimmers.show()
import pyspark.sql.functions as fn  #  导入所有的函数

swimmers.agg(
    fn.count('id').alias('id_count')
).show()

NameError: name 'swimmers' is not defined

In [196]:
test1 = swimmers.withColumn("income2",swimmers.id + 2000)
test1.show()
#2.给test1增加一列数据'label',当gender=='M'时,label=1,否则label=0.
test2 = test1.withColumn("label",fn.when(swimmers.age <=19,1).otherwise(0))
test2.show() 
test2.filter(test2.id>1).show()
#3.给test2增加一列数据'thedate',其值固定为'2018-04-11'
test3 = test2.withColumn("thedate",fn.lit('2018-04-11'))
test3.show()

+----+-------+---+--------+-------+
|  id|   name|age|eyeColor|income2|
+----+-------+---+--------+-------+
| 123|  Katie| 19|   brown|   2123|
|null|Michael| 22|  grbeen|   null|
| 235|Michael| 22|  grbeen|   2235|
| 234|Michael| 22|   green|   2234|
| 345| Simone| 23|    blue|   2345|
+----+-------+---+--------+-------+

+----+-------+---+--------+-------+-----+
|  id|   name|age|eyeColor|income2|label|
+----+-------+---+--------+-------+-----+
| 123|  Katie| 19|   brown|   2123|    1|
|null|Michael| 22|  grbeen|   null|    0|
| 235|Michael| 22|  grbeen|   2235|    0|
| 234|Michael| 22|   green|   2234|    0|
| 345| Simone| 23|    blue|   2345|    0|
+----+-------+---+--------+-------+-----+

+---+-------+---+--------+-------+-----+
| id|   name|age|eyeColor|income2|label|
+---+-------+---+--------+-------+-----+
|123|  Katie| 19|   brown|   2123|    1|
|235|Michael| 22|  grbeen|   2235|    0|
|234|Michael| 22|   green|   2234|    0|
|345| Simone| 23|    blue|   2345|    0|
+---+----

In [115]:
# 重命名已存在的列
test3.withColumnRenamed('name', 'name1').select([c for c in  test3.columns if c!='name' ]+['name1']).show(2)

+----+---+--------+-------+-----+----------+-------+
|  id|age|eyeColor|income2|label|   thedate|  name1|
+----+---+--------+-------+-----+----------+-------+
| 123| 19|   brown|   2123|    1|2018-04-11|  Katie|
|null| 22|  grbeen|   null|    0|2018-04-11|Michael|
+----+---+--------+-------+-----+----------+-------+
only showing top 2 rows



In [106]:
test3.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- income2: long (nullable = true)
 |-- label: integer (nullable = false)
 |-- thedate: string (nullable = false)



In [110]:
 test3.agg(fn.sum('id'),fn.min('age'),fn.max('label')).show()

+-------+--------+----------+
|sum(id)|min(age)|max(label)|
+-------+--------+----------+
|    937|      19|         1|
+-------+--------+----------+



In [77]:
# 筛选语句
# 获得 age=22的id
swimmers.select(fn.col('id').alias('a') ,'name').filter('age>=22').show()

+----+-------+
|   a|   name|
+----+-------+
|null|Michael|
| 235|Michael|
| 234|Michael|
| 345| Simone|
+----+-------+



In [31]:
# 另一种方法
swimmers.select(swimmers.id,swimmers.age).filter(swimmers.age>=22).show()

+---+---+
| id|age|
+---+---+
|234| 22|
|345| 23|
+---+---+



In [39]:
# 如果想获得眼睛颜色以 b开头的名字
swimmers.select(swimmers.name).filter("eyecolor like 'b%'").show()

+------+
|  name|
+------+
| Katie|
|Simone|
+------+



In [44]:
### 利用sql查询
# 行数
spark.sql("select count(*) as sum1 from swimmers").show()

+----+
|sum1|
+----+
|   3|
+----+



In [90]:
# 利用where子句
spark.sql("select id,age from swimmers where age=19 and name='Katie'").show()

+---+---+
| id|age|
+---+---+
|123| 19|
+---+---+



In [88]:
# 查眼镜颜色以b开头的
spark.sql("select * from swimmers where eyeColor like 'b%' limit 1").show()

+---+-----+---+--------+
| id| name|age|eyeColor|
+---+-----+---+--------+
|123|Katie| 19|   brown|
+---+-----+---+--------+



In [None]:
# 读取 csv数据源

flightPerfFilePath = "/databricks-datasets/flights/departuredelays.csv"
airportsFilePath = "/databricks-datasets/flights/airport-codes-na.txt"

# Obtain Airports dataset
airports = spark.read.csv(airportsFilePath, header='true', inferSchema='true', sep='\t')
airports.createOrReplaceTempView("airports")

# Obtain Departure Delays dataset
flightPerf = spark.read.csv(flightPerfFilePath, header='true')
flightPerf.createOrReplaceTempView("FlightPerformance")

# Cache the Departure Delays dataset 
flightPerf.cache()

In [91]:
# 为列表生成索引
col_list = ['username','id','gender','age']
mapping_list = list(enumerate(sorted(col_list)))
print(mapping_list)

[(0, 'age'), (1, 'gender'), (2, 'id'), (3, 'username')]


In [92]:

#将mapping_list中的key和value互换位置,并转换为dict
revs_maplist = {value:idx for [idx,value] in mapping_list}
print(revs_maplist)

{'id': 2, 'gender': 1, 'username': 3, 'age': 0}


In [97]:
test_list = [1,2,-3,10,None,-5,0,10.5] 
#for循环简写1 (此处if在for循环后面)
result1 = [item  for item in test_list if item != None]
print(result1) 
#for循环简写2 (此处if-else必须同时存在且在for循环前面)
result2  = [item if item > 0 else 0 for item in result1]
print(result2)



[1, 2, -3, 10, -5, 0, 10.5]
[1, 2, 0, 10, 0, 0, 10.5]


In [116]:
df = sc.parallelize([(1, 2, 3, 'a b c'),
                     (4, 5, 6, 'd e f'),
                     (7, 8, 9, 'g h i')]).toDF(['col1', 'col2', 'col3','col4'])


In [117]:
from pyspark.sql.functions import split, explode
df.withColumn('col4',explode(split('col4',' '))).show()

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   1|   2|   3|   a|
|   1|   2|   3|   b|
|   1|   2|   3|   c|
|   4|   5|   6|   d|
|   4|   5|   6|   e|
|   4|   5|   6|   f|
|   7|   8|   9|   g|
|   7|   8|   9|   h|
|   7|   8|   9|   i|
+----+----+----+----+



In [6]:
from pyspark.sql import Row
df = sc.parallelize([Row(r=Row(a=1, b="b"))]).toDF()
df.show()

+------+
|     r|
+------+
|[1, b]|
+------+



In [119]:
df.select(df.r.getField("b")).show()

+---+
|r.b|
+---+
|  b|
+---+



In [None]:
df.select(df.age.cast("string").alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
df.select(df.age.cast(StringType()).alias('ages')).collect()
# [Row(ages=u'2'), Row(ages=u'5')]
# 将字符串转为 int 型
df.select(df.source.cast("int").alias('sources')).take(20)


In [122]:
rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
result=rdd.groupBy(lambda x:x%2)
result.collect()

[(0, <pyspark.resultiterable.ResultIterable at 0x26cde682208>),
 (1, <pyspark.resultiterable.ResultIterable at 0x26cde682128>)]

In [124]:
resultGp=[(x,sorted(y)) for (x,y) in result.collect()]
print(resultGp)

[(0, [2, 8]), (1, [1, 1, 3, 5])]


In [125]:
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
rddGp=rdd.groupByKey()
print(rdd.collect())
print(rddGp.collect())

[('a', 1), ('b', 1), ('a', 1)]
[('a', <pyspark.resultiterable.ResultIterable object at 0x0000026CDE638748>), ('b', <pyspark.resultiterable.ResultIterable object at 0x0000026CDE6382E8>)]


In [126]:
def f(x):
    a=list(x)#直接使用x会报错，说明sequence并不能用for
    for i in range(len(a)):
        a[i]=a[i]*2
    return a
gpMp1=rddGp.mapValues(len)
gpMp2=rddGp.mapValues(list)
gpMp3=rddGp.mapValues(f)
print(gpMp1.collect())
print(gpMp2.collect())
print(gpMp3.collect())

[('a', 2), ('b', 1)]
[('a', [1, 1]), ('b', [1])]
[('a', [2, 2]), ('b', [2])]


In [131]:
 df=spark.createDataFrame([('2015-04-08',)],['a'])
df.select(fn.date_format('a','YYYYMMDD').alias('date')).collect()

[Row(date='20150498')]

In [133]:
df.describe().show()

+-------+----------+
|summary|         a|
+-------+----------+
|  count|         1|
|   mean|      null|
| stddev|      null|
|    min|2015-04-08|
|    max|2015-04-08|
+-------+----------+



In [144]:
#将数组或者矩阵存储为csv文件可以使用如下代码实现：
#numpy.savetxt('new.csv', my_matrix, delimiter = ',')

SyntaxError: invalid character in identifier (<ipython-input-144-bd66ec2c06a6>, line 3)

In [156]:
import numpy as np

csv_file= np.loadtxt(open("C:\\Users\\Administrator\\Desktop\\predict_dc.csv",encoding='utf-8'),dtype=np.str,delimiter=',',unpack=False)

data = csv_file[1:,0:].astype(np.float)

print(data)
np.savetxt('C:\\Users\\Administrator\\Desktop\\predict_dc111.csv', data, delimiter = ',')


[[ 4.8  3.   1.4  0.1]
 [ 5.1  3.8  1.9  0.4]
 [ 6.1  3.   4.6  1.4]
 [ 5.1  3.3  1.7  0.5]
 [ 5.   2.3  3.3  1. ]
 [ 5.   2.   3.5  1. ]
 [ 5.7  2.8  4.5  1.3]
 [ 6.1  2.9  4.7  1.4]
 [ 6.3  3.3  4.7  1.6]
 [ 6.5  2.8  4.6  1.5]
 [ 5.8  2.7  3.9  1.2]
 [ 5.9  3.   4.2  1.5]
 [ 5.8  4.   1.2  0.2]
 [ 5.2  3.4  1.4  0.2]
 [ 5.7  3.   4.2  1.2]
 [ 5.1  3.5  1.4  0.3]
 [ 5.7  4.4  1.5  0.4]
 [ 5.   3.5  1.3  0.3]
 [ 6.1  2.8  4.   1.3]
 [ 6.4  3.2  4.5  1.5]
 [ 5.8  2.7  4.1  1. ]
 [ 6.8  2.8  4.8  1.4]
 [ 5.7  2.8  4.1  1.3]
 [ 4.4  3.   1.3  0.2]
 [ 4.6  3.6  1.   0.2]
 [ 5.4  3.4  1.7  0.2]
 [ 5.   3.6  1.4  0.2]
 [ 5.   3.2  1.2  0.2]
 [ 4.6  3.2  1.4  0.2]
 [ 6.7  3.1  4.7  1.5]]


### 三种join

In [157]:
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(4,'Justin', 21),(5,'Cindy', 20)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)
df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1| Alice| 18|
|  2|  Andy| 19|
|  3|   Bob| 17|
|  4|Justin| 21|
|  5| Cindy| 20|
+---+------+---+



In [158]:
rdd2 = sc.parallelize([('Alice', 160),('Andy', 159),('Bob', 170),('Cindy', 165),('Rose', 160)])
schema2 = StructType([
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])
df2 = spark.createDataFrame(rdd2, schema2)
df2.show() 

+-----+------+
| name|height|
+-----+------+
|Alice|   160|
| Andy|   159|
|  Bob|   170|
|Cindy|   165|
| Rose|   160|
+-----+------+



In [159]:
#创建第三个dataframe
rdd3 = sc.parallelize([(1,'Alice', 160),(2,'Andy', 159),(3,'Tom', 175),(4,'Justin', 171),(5,'Cindy', 165)])
schema3 = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])
df3 = spark.createDataFrame(rdd3, schema3)
df3.show()


+---+------+------+
| id|  name|height|
+---+------+------+
|  1| Alice|   160|
|  2|  Andy|   159|
|  3|   Tom|   175|
|  4|Justin|   171|
|  5| Cindy|   165|
+---+------+------+



In [162]:
### inner join
df.join(df2,'name','inner').select('id',df.name,'age','height').orderBy('id').show()

+---+-----+---+------+
| id| name|age|height|
+---+-----+---+------+
|  1|Alice| 18|   160|
|  2| Andy| 19|   159|
|  3|  Bob| 17|   170|
|  5|Cindy| 20|   165|
+---+-----+---+------+



In [164]:
df.join(df2,'name','inner').select('id',df.name,'age','height').orderBy('id',ascending=False).show()

+---+-----+---+------+
| id| name|age|height|
+---+-----+---+------+
|  5|Cindy| 20|   165|
|  3|  Bob| 17|   170|
|  2| Andy| 19|   159|
|  1|Alice| 18|   160|
+---+-----+---+------+



In [None]:
### 不写连接类型默认是inner
df.join(df2,'name','inner').select('id',df.name,'age','height').orderBy('id',ascending=False).show()

In [175]:
# 如果内连接的参数不只一个，将参数放在一个列表中
df.join(df3, [df.id==df3.id, df.name==df3.name], "inner").select(df.id).orderBy(df.id).show()

+---+
| id|
+---+
|  1|
|  2|
|  4|
|  5|
+---+



In [181]:
# outer join 
# full outer join全外连接 
# 注意：不能用…select(df.name)，会报错

aa = df.join(df2, "name", "outer").select("id", "name", "age", "height").orderBy("id")
means={'id': 4.0,'height':170}
aa.fillna(means).show()

+---+------+----+------+
| id|  name| age|height|
+---+------+----+------+
|  4|  Rose|null|   160|
|  1| Alice|  18|   160|
|  2|  Andy|  19|   159|
|  3|   Bob|  17|   170|
|  4|Justin|  21|   170|
|  5| Cindy|  20|   165|
+---+------+----+------+



In [179]:
#left outer join

df.join(df2, "name", "left").select("id", "name", "age", "height").orderBy("id").show()
# 或者
df.join(df2, "name", "left").select("id", df.name, "age", "height").orderBy("id").show()


+---+------+---+------+
| id|  name|age|height|
+---+------+---+------+
|  1| Alice| 18|   160|
|  2|  Andy| 19|   159|
|  3|   Bob| 17|   170|
|  4|Justin| 21|  null|
|  5| Cindy| 20|   165|
+---+------+---+------+

+---+------+---+------+
| id|  name|age|height|
+---+------+---+------+
|  1| Alice| 18|   160|
|  2|  Andy| 19|   159|
|  3|   Bob| 17|   170|
|  4|Justin| 21|  null|
|  5| Cindy| 20|   165|
+---+------+---+------+



In [182]:
# right outer join

df.join(df2, "name", "right").select("id", "name", "age", "height").orderBy("id").show()
# 或者
df.join(df2, "name", "right").select("id", df2.name, "age", "height").orderBy("id").show()


+----+-----+----+------+
|  id| name| age|height|
+----+-----+----+------+
|null| Rose|null|   160|
|   1|Alice|  18|   160|
|   2| Andy|  19|   159|
|   3|  Bob|  17|   170|
|   5|Cindy|  20|   165|
+----+-----+----+------+

+----+-----+----+------+
|  id| name| age|height|
+----+-----+----+------+
|null| Rose|null|   160|
|   1|Alice|  18|   160|
|   2| Andy|  19|   159|
|   3|  Bob|  17|   170|
|   5|Cindy|  20|   165|
+----+-----+----+------+



In [184]:
df2.orderBy("height",fn.desc("name")).show()

+-----+------+
| name|height|
+-----+------+
| Andy|   159|
| Rose|   160|
|Alice|   160|
|Cindy|   165|
|  Bob|   170|
+-----+------+



In [185]:
df2.orderBy(["height","name"]).show()

+-----+------+
| name|height|
+-----+------+
| Andy|   159|
|Alice|   160|
| Rose|   160|
|Cindy|   165|
|  Bob|   170|
+-----+------+



In [195]:
#cast()函数 
#常用来做类型转换

df.select(df.age.cast(StringType())).show()
# 或者
df.selectExpr("cast(age as string)age").prinSchema()

+---+
|age|
+---+
| 18|
| 19|
| 17|
| 21|
| 20|
+---+

root
 |-- age: string (nullable = true)



In [7]:
##row_number().over()
from pyspark.sql import Window
rdd = sc.parallelize([(1,'Alice', 18),(2,'Andy', 19),(3,'Bob', 17),(1,'Justin', 21),(1,'Cindy', 20)])
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])
df = spark.createDataFrame(rdd, schema)



In [24]:
import pyspark.sql.functions as fn
#按照每个组内的年龄排序，组外的分布并不管

a =df.withColumn("rn", fn.row_number().over(Window.partitionBy("id").orderBy("age"))).show()

+---+------+---+---+
| id|  name|age| rn|
+---+------+---+---+
|  1| Alice| 18|  1|
|  1| Cindy| 20|  2|
|  1|Justin| 21|  3|
|  3|   Bob| 17|  1|
|  2|  Andy| 19|  1|
+---+------+---+---+



In [193]:
#按照年龄排序，组外面分布也管
df.withColumn("rn", fn.row_number().over(Window.partitionBy("id").orderBy("age"))).orderBy("age").show()

+---+------+---+---+
| id|  name|age| rn|
+---+------+---+---+
|  1| Alice| 18|  1|
|  1| Cindy| 20|  2|
|  1|Justin| 21|  3|
|  3|   Bob| 17|  1|
|  2|  Andy| 19|  1|
+---+------+---+---+



In [200]:
df.groupby('id').count().show()

+---+-----+
| id|count|
+---+-----+
|  1|    3|
|  3|    1|
|  2|    1|
+---+-----+



In [202]:
df.filter(df.name.like('Al%')).show()

+---+-----+---+
| id| name|age|
+---+-----+---+
|  1|Alice| 18|
+---+-----+---+



In [204]:
df.select(fn.format_number('id',4).alias('v')).show()

+------+
|     v|
+------+
|1.0000|
|2.0000|
|3.0000|
|1.0000|
|1.0000|
+------+



In [26]:
from pyspark.sql import Window
rdd = sc.parallelize([('牛奶+进口','12215,9434,13604'),('除湿','15932,1283,1661'),('除湿','15932,1283,1661'),('除湿','15932,1283,1661')])
schema = StructType([
    StructField("key_word", StringType(), True),
    StructField("high_value", StringType(), True)
])
df = spark.createDataFrame(rdd, schema)


In [36]:

rdd3 = sc.parallelize([(3998,'15932'),(6909,'8302'),(4565,'12215'),(12345,'1661')])
schema3 = StructType([
    StructField("brand_cd", IntegerType(), True),
    StructField("item", StringType(), True)
])
df3 = spark.createDataFrame(rdd3, schema3)

In [37]:
df.createOrReplaceTempView('guanjianci')
df3.createOrReplaceTempView('pinpaibiao')
spark.sql(
    '''
select brand_cd, key_word,chaxunliang,rn from 
(
select   brand_cd, key_word,chaxunliang,row_number() over(partition by  brand_cd, key_word  order by chaxunliang desc) as rn  from
(
select brand_cd, key_word ,count(*) as chaxunliang  from  guanjianci a   join pinpaibiao b  on ','||a.high_value||','  like '%,' || b.item || ',%'
    group by 1,2 ) a)b where rn<=20
    '''
).show()  

+--------+--------+-----------+---+
|brand_cd|key_word|chaxunliang| rn|
+--------+--------+-----------+---+
|   12345|      除湿|          3|  1|
|    4565|   牛奶+进口|          1|  1|
|    3998|      除湿|          3|  1|
+--------+--------+-----------+---+

