In [1]:
import pandas as pd
import numpy as np
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf, HiveContext

In [2]:
# 含有大单元格的数据
# 如果没有D级代表最后分类，使用C作为最后分类

d = {"A": ["a", np.nan, np.nan, "b", np.nan],
     "B": ["a1", "a2", np.nan, "b1", "b2"],
     "C": ["a1-1", "a2-1", "a2-2", "b1-1", "b2-1"],
     "D": [np.nan, np.nan, np.nan, "end-1", "end-2"]}


df = pd.DataFrame(d)
df[["A", "B"]] = df[["A", "B"]].ffill()
c = np.where(pd.isnull(df["D"]), df["D"], df["C"])
d = np.where(pd.isnull(df["D"]), df["C"], df["D"])
df["C"] = c
df["D"] = d
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
df["ID"] = df.index+1
print(df)

   A   B     C      D  ID
0  a  a1   NaN   a1-1   1
1  a  a2   NaN   a2-1   2
2  a  a2   NaN   a2-2   3
3  b  b1  b1-1  end-1   4
4  b  b2  b2-1  end-2   5


In [3]:
# 将A列值为a的单元格扩充到b列

d = {"A": [" a ", " ", " c "],
     "B": ["", "", ""]}
df = pd.DataFrame(d)
df = df.apply(lambda x: x.str.strip())
df["source"] = "haha"
df["B"] = df["A"].apply(lambda x: x if x == "a" else "")
print(df)

   A  B source
0  a  a   haha
1         haha
2  c      haha


In [5]:
# 为spark的dataFrame添加id
# c1, c2, c3 found in df1, p4 found in df2, connect df1 and df2 make df3

sc = SparkContext()
sqlContext = HiveContext(sc)

df1 = sqlContext.createDataFrame(
    [(1, "a", "aa", "aaa"), (2, "b", "bb", "bbb"),
     (3, "c", "cc", "ccc"), (4, "d", "dd", "ddd")],
    ("id", "c1", "c2", "c3"))

df2 = sqlContext.createDataFrame(
    [(1, "1", "11", "111", "p1"), (2, "2", "22", "", "p2"), (3, "3", "33", "", "p3")],
    ("id", "p1", "p2", "p3", "p4"))

df3 = sqlContext.createDataFrame(
    [("a", "aa", "aaa", "p1", 0, 0), ("b", "bb", "bbb", "p1", 0, 0),
     ("z", "zz", "", "p9", 0, 0), ("c", "cc", "ccc", "p2", 0, 0)],
    ("c1", "c2", "c3", "p4", "d1_id", "d2_id"))

df = df3.\
    join(df1, (df3.c1 == df1.c1) & (df3.c2 == df1.c2) & (df3.c3 == df1.c3), "inner").\
    join(df2, (df3.p4 == df2.p4), "inner").\
    select(lit(0).alias('id'), df3.c1, df3.c2, df3.c3, df3.p4, df1.id.alias("d1_id"), df2.id.alias("d2_id")).\
    filter((df1.id > 0) & (df2.id > 0))
df = df.withColumn("id", row_number().over(
    Window.partitionBy("id").orderBy("id")))
df.show()
sc.stop()

+---+---+---+---+---+-----+-----+
| id| c1| c2| c3| p4|d1_id|d2_id|
+---+---+---+---+---+-----+-----+
|  1|  c| cc|ccc| p2|    3|    2|
|  2|  b| bb|bbb| p1|    2|    1|
|  3|  a| aa|aaa| p1|    1|    1|
+---+---+---+---+---+-----+-----+



In [6]:
old = np.array([[1,1,1], [1,1,1]])
new = old
new[0,:2]=0
print(old)

[[0 0 1]
 [1 1 1]]
