In [None]:
from pyspark.sql import SparkSession

# 创建SparkSession
spark = SparkSession.builder \
    .appName("Spark SQL Example") \
    .getOrCreate()

# 创建一个示例DataFrame
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# 注册DataFrame为临时视图
df.createOrReplaceTempView("people")

# 使用SQL查询
result = spark.sql("SELECT Name, Age FROM people WHERE Age > 30")

# 显示查询结果
result.show()

# 停止SparkSession
spark.stop()

In [None]:
<!-- Note: you may need to restart the kernel to use updated packages. -->
pip install kafka-python

[33mDEPRECATION: Loading egg at /Users/arwen/anaconda3/lib/python3.11/site-packages/dtkApi-1.0.10-py3.11.egg is deprecated. pip 23.3 will enforce this behaviour change. A possible replacement is to use pip for package installation..[0m[33m
[0mLooking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Note: you may need to restart the kernel to use updated packages.


In [None]:
### 如何本地安装Kafka环境

1. **下载Kafka**:
    - 访问 [Kafka官网](https://kafka.apache.org/downloads) 下载最新版本的Kafka。
    - 选择一个镜像站点并下载Kafka的二进制文件（例如：`kafka_2.13-2.8.0.tgz`）。

2. **解压文件**:
    ```bash
    tar -xzf kafka_2.13-2.8.0.tgz
    cd kafka_2.13-2.8.0
    ```

3. **启动ZooKeeper**:
    Kafka依赖于ZooKeeper来管理集群。首先需要启动ZooKeeper。
    ```bash
    bin/zookeeper-server-start.sh config/zookeeper.properties
    ```

4. **启动Kafka服务器**:
    在另一个终端窗口中启动Kafka服务器。
    ```bash
    bin/kafka-server-start.sh config/server.properties
    ```

5. **创建主题**:
    创建一个名为`my_topic`的主题。
    ```bash
    bin/kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
    ```

6. **生产消息**:
    启动一个Kafka生产者并发送消息。
    ```bash
    bin/kafka-console-producer.sh --topic my_topic --bootstrap-server localhost:9092
    > Hello, Kafka!
    ```

7. **消费消息**:
    启动一个Kafka消费者来消费消息。
    ```bash
    bin/kafka-console-consumer.sh --topic my_topic --from-beginning --bootstrap-server localhost:9092
    ```

通过以上步骤，你已经在本地成功安装并运行了Kafka环境。

In [None]:
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError

# 生产消息
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])

# 发送消息
future = producer.send('my_topic', b'Hello, Kafka!')

# 检查发送结果
try:
    record_metadata = future.get(timeout=10)
except KafkaError as e:
    print(f"Error: {e}")
    pass

# 消费消息
consumer = KafkaConsumer('my_topic', bootstrap_servers=['localhost:9092'], auto_offset_reset='earliest')

for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")
    break  # 只消费一条消息后退出

In [None]:
from pyflink.table import EnvironmentSettings, TableEnvironment

# 创建TableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
table_env = TableEnvironment.create(env_settings)

# 创建Kafka连接表
kafka_source_ddl = """
CREATE TABLE kafka_source (
    `user_id` STRING,
    `item_id` STRING,
    `behavior` STRING,
    `ts` TIMESTAMP(3)
) WITH (
    'connector' = 'kafka',
    'topic' = 'my_topic',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
)
"""

# 注册Kafka连接表
table_env.execute_sql(kafka_source_ddl)

# 查询Kafka连接表
result_table = table_env.sql_query("SELECT user_id, item_id, behavior, ts FROM kafka_source WHERE behavior = 'buy'")

# 将结果打印到控制台
table_env.to_append_stream(result_table).print()

# 执行Flink作业
table_env.execute("Flink SQL Job")

In [None]:
from streamparse import Topology, Spout, Bolt
from streamparse.storm import Tuple
import random
from streamparse import run

class RandomSentenceSpout(Spout):
    outputs = ['sentence']

    def initialize(self, stormconf, context):
        self.sentences = [
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"
        ]

    def next_tuple(self):
        sentence = random.choice(self.sentences)
        self.emit([sentence])

class SplitSentenceBolt(Bolt):
    outputs = ['word']

    def process(self, tup):
        sentence = tup.values[0]
        words = sentence.split()
        for word in words:
            self.emit([word])

class WordCountBolt(Bolt):
    outputs = ['word', 'count']

    def initialize(self, stormconf, context):
        self.counts = {}

    def process(self, tup):
        word = tup.values[0]
        if word not in self.counts:
            self.counts[word] = 0
        self.counts[word] += 1
        self.emit([word, self.counts[word]])
        self.log(f'{word}: {self.counts[word]}')

class WordCountTopology(Topology):
    random_sentence_spout = RandomSentenceSpout.spec()
    split_sentence_bolt = SplitSentenceBolt.spec(inputs=[random_sentence_spout])
    word_count_bolt = WordCountBolt.spec(inputs=[split_sentence_bolt])

if __name__ == '__main__':
    run(WordCountTopology)

In [None]:
from pyhive import hive
import happybase

# 连接到Hive
hive_conn = hive.Connection(host='localhost', port=10000, username='your_username')
hive_cursor = hive_conn.cursor()

# 创建Hive表
hive_cursor.execute("CREATE TABLE IF NOT EXISTS users (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")
hive_cursor.execute("LOAD DATA LOCAL INPATH '/path/to/your/data.csv' INTO TABLE users")

# 查询Hive表
hive_cursor.execute("SELECT * FROM users")
for result in hive_cursor.fetchall():
    print(result)

# 关闭Hive连接
hive_conn.close()

# 连接到HBase
hbase_conn = happybase.Connection('localhost')
table = hbase_conn.table('users')

# 插入数据到HBase
table.put(b'row1', {b'cf:name': b'Alice', b'cf:age': b'30'})
table.put(b'row2', {b'cf:name': b'Bob', b'cf:age': b'25'})

# 查询HBase表
for key, data in table.scan():
    print(key, data)

# 关闭HBase连接
hbase_conn.close()

In [None]:
from pyhive import hive
import happybase

# 连接到Hive
hive_conn = hive.Connection(host='localhost', port=10000, username='your_username')
hive_cursor = hive_conn.cursor()

# 创建Hive表
hive_cursor.execute("CREATE TABLE IF NOT EXISTS hdfs_data (id INT, name STRING, age INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','")

# 将HDFS数据加载到Hive表中
hive_cursor.execute("LOAD DATA INPATH '/path/to/hdfs/data.csv' INTO TABLE hdfs_data")

# 查询Hive表
hive_cursor.execute("SELECT * FROM hdfs_data")
for result in hive_cursor.fetchall():
    print(result)

# 创建HBase连接
hbase_conn = happybase.Connection('localhost')
table = hbase_conn.table('hbase_table')

# 将Hive表的数据加载到HBase表中
hive_cursor.execute("SELECT * FROM hdfs_data")
for row in hive_cursor.fetchall():
    row_key = f'row{row[0]}'.encode('utf-8')
    table.put(row_key, {b'cf:name': row[1].encode('utf-8'), b'cf:age': str(row[2]).encode('utf-8')})

# 查询HBase表
for key, data in table.scan():
    print(key, data)

# 关闭连接
hive_conn.close()
hbase_conn.close()

In [None]:
import pyarrow as pa

import pyarrow.hdfs as hdfs

# 创建HDFS连接
hdfs_client = hdfs.connect(host='localhost', port=9000, user='your_username')

# 创建一个示例数据
data = b"Hello, HDFS!"

# 将数据写入HDFS文件
with hdfs_client.open('/path/to/hdfs/file.txt', 'wb') as f:
    f.write(data)

# 读取HDFS文件内容
with hdfs_client.open('/path/to/hdfs/file.txt', 'rb') as f:
    print(f.read())

# 关闭HDFS连接
hdfs_client.close()

In [None]:
import happybase
import pyarrow as pa
import pyarrow.hdfs as hdfs

# 示例数据
data = [
    {'id': 1, 'name': 'Alice', 'age': 30},
    {'id': 2, 'name': 'Bob', 'age': 25},
    {'id': 3, 'name': 'Charlie', 'age': 35}
]

# 将数据存入HBase
hbase_conn = happybase.Connection('localhost')
table = hbase_conn.table('users')

for row in data:
    row_key = f'row{row["id"]}'.encode('utf-8')
    table.put(row_key, {
        b'cf:name': row['name'].encode('utf-8'),
        b'cf:age': str(row['age']).encode('utf-8')
    })

# 关闭HBase连接
hbase_conn.close()

# 将数据存入HDFS
hdfs_client = hdfs.connect(host='localhost', port=9000, user='your_username')

# 创建一个示例数据文件内容
file_content = "id,name,age\n" + "\n".join([f'{row["id"]},{row["name"]},{row["age"]}' for row in data])

# 将数据写入HDFS文件
with hdfs_client.open('/path/to/hdfs/users.csv', 'wb') as f:
    f.write(file_content.encode('utf-8'))

# 关闭HDFS连接
hdfs_client.close()

In [None]:
### 如何本地安装HBase环境

1. **下载HBase**:
    - 访问 [HBase官网](https://hbase.apache.org/downloads.html) 下载最新版本的HBase。
    - 选择一个镜像站点并下载HBase的二进制文件（例如：`hbase-2.4.8-bin.tar.gz`）。

2. **解压文件**:
    ```bash
    tar -xzf hbase-2.4.8-bin.tar.gz
    cd hbase-2.4.8
    ```

3. **配置HBase**:
    编辑`conf/hbase-site.xml`文件，添加以下配置：
    ```xml
    <configuration>
        <property>
            <name>hbase.rootdir</name>
            <value>file:///path/to/hbase</value>
        </property>
        <property>
            <name>hbase.zookeeper.property.dataDir</name>
            <value>/path/to/zookeeper</value>
        </property>
    </configuration>
    ```

4. **启动HBase**:
    ```bash
    ./bin/start-hbase.sh
    ```

5. **验证安装**:
    通过HBase shell验证安装是否成功：
    ```bash
    ./bin/hbase shell
    ```

通过以上步骤，你已经在本地成功安装并运行了HBase环境。