In [79]:
import fuzzingbook.bookutils
from typing import List, Any, Optional, Union
from fuzzingbook.InformationFlow import *
from fuzzingbook.GrammarFuzzer import *

In [80]:
# 例如，可以用“LOW”标记来自第三方输入的字符串，这意味着它们具有较低的安全级别。在tstr对象的构造函数中传递污点:

thello = tstr('hello', taint='LOW') # hello从用户输入中引入
print(thello[:4])
print(thello.taint)

hell
LOW


In [81]:
# ostr对象扩展了tstr对象，它不仅跟踪一个污点，还跟踪来自输入字符串的原始索引。
# 这允许您精确地跟踪各个字符的来源。假设您有一个长字符串，在索引100处包含密码“joshua1234”。
# 然后你可以使用一个如下所示的ostr来保存这个原点信息:

secret = ostr("joshua1234", origin=100, taint='SECRET')
print(secret.origin)
print(secret.taint)

# ostr对象与Python字符串兼容，但字符串操作返回的是ostr对象(以及保存的原点和索引信息)。
# 索引为-1表示对应的字符没有提供给ostr()构造函数的原点:

secret_substr = (secret[0:4] + "-" + secret[6:])
secret_substr.taint
secret_substr.origin

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
SECRET


[100, 101, 102, 103, -1, 106, 107, 108, 109]

# A Vulnerable Database

In [82]:
# 假设我们想要用Python实现内存中的数据库服务。这是一个相当脆弱的尝试。我们使用以下数据集。

INVENTORY = """\
1997,van,Ford,E350
2000,car,Mercury,Cougar
1999,car,Chevy,Venture\
"""

VEHICLES = INVENTORY.split('\n')

In [83]:
def sample_db():
    db = DB()
    inventory_def = {'year': int, 'kind': str, 'company': str, 'model': str}
    db.create_table('inventory', inventory_def)
    return db

db = sample_db()
db.table('inventory')

({'year': int, 'kind': str, 'company': str, 'model': str}, [])

In [84]:
db = sample_db()
decl, rows = db.table('inventory')
db.column(decl, 'year')

int

In [85]:
some_db = DB()

# 我们首先在数据库中创建一个具有正确数据类型的表。
inventory_def = {'year': int, 'kind': str, 'company': str, 'model': str}
db.create_table('inventory', inventory_def)

for V in VEHICLES:
    update_inventory(db, V)

db.db

{'inventory': ({'year': int, 'kind': str, 'company': str, 'model': str},
  [{'year': 1997, 'kind': 'van', 'company': 'Ford', 'model': 'E350'},
   {'year': 2000, 'kind': 'car', 'company': 'Mercury', 'model': 'Cougar'},
   {'year': 1999, 'kind': 'car', 'company': 'Chevy', 'model': 'Venture'}])}

In [86]:
db.sql('select year,kind from inventory')

[(1997, 'van'), (2000, 'car'), (1999, 'car')]

In [87]:
db.sql("select company,model from inventory where kind == 'car'")

[('Mercury', 'Cougar'), ('Chevy', 'Venture')]

In [88]:
db.sql('select int(year)+10 from inventory')

[2007, 2010, 2009]

In [89]:
# 添加新的行
db.sql("insert into inventory (year, kind, company, model) values (1, 'charriot', 'Rome', 'Quadriga')")
db.db

{'inventory': ({'year': int, 'kind': str, 'company': str, 'model': str},
  [{'year': 1997, 'kind': 'van', 'company': 'Ford', 'model': 'E350'},
   {'year': 2000, 'kind': 'car', 'company': 'Mercury', 'model': 'Cougar'},
   {'year': 1999, 'kind': 'car', 'company': 'Chevy', 'model': 'Venture'},
   {'year': 1, 'kind': 'charriot', 'company': 'Rome', 'model': 'Quadriga'}])}

In [90]:
# 删除
db.sql("delete from inventory where year < 1900")

'1 records were deleted'

Fuzzin SQL
定义一个sql的语法

In [91]:
gf = GrammarFuzzer(INVENTORY_GRAMMAR_F)
for _ in range(10):
    query = gf.fuzz()
    print(repr(query))
    try:
        res = db.sql(query)
        print(repr(res))
    except SQLException as e:
        print("> ", e)
        pass
    except:
        traceback.print_exc()
        break
    print()

'delete from inventory where y/u-l+f/y<Y(c)/A-H*q'
>  Invalid WHERE ('y/u-l+f/y<Y(c)/A-H*q')

"insert into inventory (G,Wmp,sl3hku3) values ('<','?')"
>  Column ('G') was not found

"insert into inventory (d0) values (',_G')"
>  Column ('d0') was not found

'select P*Q-w/x from inventory where X<j==:==j*r-f'
>  Invalid WHERE ('(X<j==:==j*r-f)')

'select a>F*i from inventory where Q/I-_+P*j>.'
>  Invalid WHERE ('(Q/I-_+P*j>.)')

'select (V-i<T/g) from inventory where T/r/G<FK(m)/(i)'
>  Invalid WHERE ('(T/r/G<FK(m)/(i))')

'select (((i))),_(S,_)/L-k<H(Sv,R,n,W,Y) from inventory'
>  Invalid WHERE ('((((i))),_(S,_)/L-k<H(Sv,R,n,W,Y))')

'select (N==c*U/P/y),i-e/n*y,T!=w,u from inventory'
>  Invalid WHERE ('((N==c*U/P/y),i-e/n*y,T!=w,u)')

'update inventory set _=B,n=v where o-p*k-J>T'
>  Column ('_') was not found

'select s from inventory where w4g4<.m(_)/_>t'
>  Invalid WHERE ('(w4g4<.m(_)/_>t)')



Fuzzing好像没有触发任何崩溃？这就说明程序没有任何问题吗？

In [92]:
db.sql('select year from inventory where year < 2000')

[1997, 1999]

In [93]:
# 这是因为如果year < 2000, year - 1900是有效的Python表达式。
# (不过，它不是一个有效的SQL表达式。)
db.sql('select year - 1900 if year < 2000 else year - 2000 from inventory')

[97, 0, 99]

In [94]:
# 上面的问题是Python表达式可以做的事情没有任何限制。如果用户尝试以下操作会怎样呢
# 可能实现命令注入，导致RCE
db.sql('select __import__("os").popen("pwd").read() from inventory')

['/home/lzy/Code/fuzzingbook\n',
 '/home/lzy/Code/fuzzingbook\n',
 '/home/lzy/Code/fuzzingbook\n']

In [95]:
# db.sql('select __import__("os").popen("system(/bin/sh)").read() from inventory')

# 跟踪字符串污点
可以执行不同级别的污染跟踪。最简单的方法是跟踪字符串片段起源于特定的环境，并且没有经过去除污染的过程。为此，我们只需要用tstr包装原始字符串和环境标识符(污染)，并在每个操作上生成tstr实例，结果是另一个字符串片段。属性taint持有一个标识派生此实例的环境的标签。

In [96]:
# 例如，如果我们在tstr中wrap了“hello”，那么我们应该能够访问它的污点
thello: tstr = tstr('hello', taint='LOW')
thello.taint

'LOW'

In [97]:
repr(thello).taint

'LOW'

In [98]:
informationflow_init_1()
INITIALIZER_LIST = [informationflow_init_1]

def initialize():
    for fn in INITIALIZER_LIST:
        fn()

In [99]:
thello = tstr('hello', taint='LOW')
thello[0].taint

'LOW'

In [100]:
thello[1:3].taint

'LOW'

In [101]:
(tstr('foo', taint='HIGH') + 'bar').taint

'HIGH'

In [102]:
('foo' + tstr('bar', taint='HIGH')).taint

'HIGH'

In [103]:
thello += ', world'
thello.taint

'LOW'

# 跟踪不信任的输入

我们定义了一个“更好的”TrustedDB，它只接受被污染为“TRUSTED”的字符串。

提供一个具有“未知”(即不存在)信任级别的字符串将导致TrustedDB失败

In [104]:
bdb = TrustedDB(db.db)

from fuzzingbook.ExpectError import ExpectError

In [105]:
with ExpectError():
    bdb.sql("select year from INVENTORY")

Traceback (most recent call last):
  File "/tmp/ipykernel_1387027/3935989889.py", line 2, in <cell line: 1>
    bdb.sql("select year from INVENTORY")
  File "/home/inspur/miniconda3/envs/fuzzingbook/lib/python3.10/site-packages/fuzzingbook/InformationFlow.py", line 763, in sql
    assert isinstance(s, tstr), "Need a tainted string"
AssertionError: Need a tainted string (expected)


In [106]:
bad_user_input = tstr('__import__("os").popen("ls").read()', taint='UNTRUSTED')
with ExpectError():
    bdb.sql(bad_user_input)

Traceback (most recent call last):
  File "/tmp/ipykernel_1387027/3307042773.py", line 3, in <cell line: 2>
    bdb.sql(bad_user_input)
  File "/home/inspur/miniconda3/envs/fuzzingbook/lib/python3.10/site-packages/fuzzingbook/InformationFlow.py", line 764, in sql
    assert s.taint == 'TRUSTED', "Need a string with trusted taint"
AssertionError: Need a string with trusted taint (expected)


因此，在计算过程中的某个地方，我们必须将“不可信的”输入转换为“可信的”字符串。这个过程被称为消毒处理。针对我们的目的，一个简单的消毒函数可以确保输入只包含少量允许的字符(不包括字母或引号);如果是这种情况，那么输入将获得一个新的“TRUSTED”污染。如果不是，则将字符串转换为(不可信的)空字符串;其他替代方法是抛出错误或转义或删除“不可信”字符。

In [107]:
import re

def sanitize(user_input):
    assert isinstance(user_input, tstr)
    if re.match(
            r'^select +[-a-zA-Z0-9_, ()]+ from +[-a-zA-Z0-9_, ()]+$', user_input):
        return tstr(user_input, taint='TRUSTED')
    else:
        return tstr('', taint='UNTRUSTED')

In [108]:
good_user_input = tstr("select year,model from inventory", taint='UNTRUSTED')
sanitized_input = sanitize(good_user_input)
sanitized_input

'select year,model from inventory'

In [109]:
sanitized_input.taint

'TRUSTED'

In [110]:
bdb.sql(sanitized_input)

[(1997, 'E350'), (2000, 'Cougar'), (1999, 'Venture')]

In [111]:
sanitized_input = sanitize(bad_user_input)
sanitized_input

''

In [112]:
sanitized_input.taint

'UNTRUSTED'

In [113]:
with ExpectError():
    bdb.sql(sanitized_input)

Traceback (most recent call last):
  File "/tmp/ipykernel_1387027/249000876.py", line 2, in <cell line: 1>
    bdb.sql(sanitized_input)
  File "/home/inspur/miniconda3/envs/fuzzingbook/lib/python3.10/site-packages/fuzzingbook/InformationFlow.py", line 764, in sql
    assert s.taint == 'TRUSTED', "Need a string with trusted taint"
AssertionError: Need a string with trusted taint (expected)


类似地，这种方法可以应用于检测和防止web安全中的sql注入。

# 污点感知的模糊测试